Skip to content

Commit aaedb72

Browse files
committed
Support GCS vended credentials from Iceberg REST catalog
1 parent 3851f43 commit aaedb72

File tree

9 files changed

+1072
-9
lines changed

9 files changed

+1072
-9
lines changed

lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,23 +76,40 @@ public class GcsFileSystem
7676
private final long writeBlockSizeBytes;
7777
private final int pageSize;
7878
private final int batchSize;
79+
private final Optional<EncryptionKey> defaultEncryptionKey;
80+
private final Optional<EncryptionKey> defaultDecryptionKey;
7981

8082
public GcsFileSystem(ListeningExecutorService executorService, Storage storage, int readBlockSizeBytes, long writeBlockSizeBytes, int pageSize, int batchSize)
83+
{
84+
this(executorService, storage, readBlockSizeBytes, writeBlockSizeBytes, pageSize, batchSize, Optional.empty(), Optional.empty());
85+
}
86+
87+
public GcsFileSystem(
88+
ListeningExecutorService executorService,
89+
Storage storage,
90+
int readBlockSizeBytes,
91+
long writeBlockSizeBytes,
92+
int pageSize,
93+
int batchSize,
94+
Optional<EncryptionKey> defaultEncryptionKey,
95+
Optional<EncryptionKey> defaultDecryptionKey)
8196
{
8297
this.executorService = requireNonNull(executorService, "executorService is null");
8398
this.storage = requireNonNull(storage, "storage is null");
8499
this.readBlockSizeBytes = readBlockSizeBytes;
85100
this.writeBlockSizeBytes = writeBlockSizeBytes;
86101
this.pageSize = pageSize;
87102
this.batchSize = batchSize;
103+
this.defaultEncryptionKey = requireNonNull(defaultEncryptionKey, "defaultEncryptionKey is null");
104+
this.defaultDecryptionKey = requireNonNull(defaultDecryptionKey, "defaultDecryptionKey is null");
88105
}
89106

90107
@Override
91108
public TrinoInputFile newInputFile(Location location)
92109
{
93110
GcsLocation gcsLocation = new GcsLocation(location);
94111
checkIsValidFile(gcsLocation);
95-
return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.empty(), Optional.empty(), Optional.empty());
112+
return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.empty(), Optional.empty(), defaultDecryptionKey);
96113
}
97114

98115
@Override
@@ -108,7 +125,7 @@ public TrinoInputFile newInputFile(Location location, long length)
108125
{
109126
GcsLocation gcsLocation = new GcsLocation(location);
110127
checkIsValidFile(gcsLocation);
111-
return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.of(length), Optional.empty(), Optional.empty());
128+
return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.of(length), Optional.empty(), defaultDecryptionKey);
112129
}
113130

114131
@Override
@@ -124,7 +141,7 @@ public TrinoInputFile newInputFile(Location location, long length, Instant lastM
124141
{
125142
GcsLocation gcsLocation = new GcsLocation(location);
126143
checkIsValidFile(gcsLocation);
127-
return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.of(length), Optional.of(lastModified), Optional.empty());
144+
return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.of(length), Optional.of(lastModified), defaultDecryptionKey);
128145
}
129146

130147
@Override
@@ -140,7 +157,7 @@ public TrinoOutputFile newOutputFile(Location location)
140157
{
141158
GcsLocation gcsLocation = new GcsLocation(location);
142159
checkIsValidFile(gcsLocation);
143-
return new GcsOutputFile(gcsLocation, storage, writeBlockSizeBytes, Optional.empty());
160+
return new GcsOutputFile(gcsLocation, storage, writeBlockSizeBytes, defaultEncryptionKey);
144161
}
145162

146163
@Override

lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemFactory.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,17 @@
1717
import com.google.inject.Inject;
1818
import io.trino.filesystem.TrinoFileSystem;
1919
import io.trino.filesystem.TrinoFileSystemFactory;
20+
import io.trino.filesystem.encryption.EncryptionKey;
2021
import io.trino.spi.security.ConnectorIdentity;
2122
import jakarta.annotation.PreDestroy;
2223

24+
import java.util.Base64;
25+
import java.util.Optional;
26+
2327
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
2428
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
29+
import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_DECRYPTION_KEY_PROPERTY;
30+
import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_ENCRYPTION_KEY_PROPERTY;
2531
import static java.lang.Math.toIntExact;
2632
import static java.util.Objects.requireNonNull;
2733
import static java.util.concurrent.Executors.newCachedThreadPool;
@@ -56,6 +62,18 @@ public void stop()
5662
@Override
5763
public TrinoFileSystem create(ConnectorIdentity identity)
5864
{
59-
return new GcsFileSystem(executorService, storageFactory.create(identity), readBlockSizeBytes, writeBlockSizeBytes, pageSize, batchSize);
65+
Optional<EncryptionKey> defaultEncryptionKey = extractEncryptionKey(identity, EXTRA_CREDENTIALS_GCS_ENCRYPTION_KEY_PROPERTY);
66+
Optional<EncryptionKey> defaultDecryptionKey = extractEncryptionKey(identity, EXTRA_CREDENTIALS_GCS_DECRYPTION_KEY_PROPERTY);
67+
return new GcsFileSystem(executorService, storageFactory.create(identity), readBlockSizeBytes, writeBlockSizeBytes, pageSize, batchSize, defaultEncryptionKey, defaultDecryptionKey);
68+
}
69+
70+
private static Optional<EncryptionKey> extractEncryptionKey(ConnectorIdentity identity, String extraCredentialKey)
71+
{
72+
String base64Key = identity.getExtraCredentials().get(extraCredentialKey);
73+
if (base64Key == null) {
74+
return Optional.empty();
75+
}
76+
byte[] keyBytes = Base64.getDecoder().decode(base64Key);
77+
return Optional.of(new EncryptionKey(keyBytes, "AES256"));
6078
}
6179
}

lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsStorageFactory.java

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
package io.trino.filesystem.gcs;
1515

1616
import com.google.api.gax.retrying.RetrySettings;
17+
import com.google.auth.oauth2.AccessToken;
18+
import com.google.auth.oauth2.GoogleCredentials;
19+
import com.google.cloud.NoCredentials;
1720
import com.google.cloud.storage.Storage;
1821
import com.google.cloud.storage.StorageOptions;
1922
import com.google.inject.Inject;
@@ -22,11 +25,18 @@
2225
import java.io.IOException;
2326
import java.io.UncheckedIOException;
2427
import java.time.Duration;
28+
import java.util.Date;
2529
import java.util.Map;
2630
import java.util.Optional;
2731

2832
import static com.google.cloud.storage.StorageRetryStrategy.getUniformStorageRetryStrategy;
2933
import static com.google.common.net.HttpHeaders.USER_AGENT;
34+
import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_NO_AUTH_PROPERTY;
35+
import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_EXPIRES_AT_PROPERTY;
36+
import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_PROPERTY;
37+
import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_PROJECT_ID_PROPERTY;
38+
import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_SERVICE_HOST_PROPERTY;
39+
import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_USER_PROJECT_PROPERTY;
3040
import static java.util.Objects.requireNonNull;
3141

3242
public class GcsStorageFactory
@@ -59,14 +69,45 @@ public GcsStorageFactory(GcsFileSystemConfig config, GcsAuth gcsAuth)
5969
public Storage create(ConnectorIdentity identity)
6070
{
6171
try {
72+
Map<String, String> extraCredentials = identity.getExtraCredentials();
73+
boolean noAuth = Boolean.parseBoolean(extraCredentials.getOrDefault(EXTRA_CREDENTIALS_GCS_NO_AUTH_PROPERTY, "false"));
74+
String vendedOAuthToken = extraCredentials.get(EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_PROPERTY);
75+
6276
StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder();
63-
if (projectId != null) {
64-
storageOptionsBuilder.setProjectId(projectId);
77+
78+
String effectiveProjectId = extraCredentials.getOrDefault(EXTRA_CREDENTIALS_GCS_PROJECT_ID_PROPERTY, projectId);
79+
if (effectiveProjectId != null) {
80+
storageOptionsBuilder.setProjectId(effectiveProjectId);
6581
}
6682

67-
gcsAuth.setAuth(storageOptionsBuilder, identity);
83+
if (noAuth) {
84+
storageOptionsBuilder.setCredentials(NoCredentials.getInstance());
85+
}
86+
else if (vendedOAuthToken != null) {
87+
Date expirationTime = null;
88+
String expiresAt = extraCredentials.get(EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_EXPIRES_AT_PROPERTY);
89+
if (expiresAt != null) {
90+
expirationTime = new Date(Long.parseLong(expiresAt));
91+
}
92+
AccessToken accessToken = new AccessToken(vendedOAuthToken, expirationTime);
93+
storageOptionsBuilder.setCredentials(GoogleCredentials.create(accessToken));
94+
}
95+
else {
96+
gcsAuth.setAuth(storageOptionsBuilder, identity);
97+
}
6898

69-
endpoint.ifPresent(storageOptionsBuilder::setHost);
99+
String vendedServiceHost = extraCredentials.get(EXTRA_CREDENTIALS_GCS_SERVICE_HOST_PROPERTY);
100+
if (vendedServiceHost != null) {
101+
storageOptionsBuilder.setHost(vendedServiceHost);
102+
}
103+
else {
104+
endpoint.ifPresent(storageOptionsBuilder::setHost);
105+
}
106+
107+
String vendedUserProject = extraCredentials.get(EXTRA_CREDENTIALS_GCS_USER_PROJECT_PROPERTY);
108+
if (vendedUserProject != null) {
109+
storageOptionsBuilder.setQuotaProjectId(vendedUserProject);
110+
}
70111

71112
// Note: without uniform strategy we cannot retry idempotent operations.
72113
// The trino-filesystem api does not violate the conditions for idempotency, see https://cloud.google.com/storage/docs/retry-strategy#java for details.

0 commit comments

Comments
 (0)