Skip to content

Commit 790f81b

Browse files
Address review comments
1 parent 284ad40 commit 790f81b

File tree

7 files changed

+107
-49
lines changed

7 files changed

+107
-49
lines changed

extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.hadoop.conf.Configuration;
2424
import org.apache.iceberg.catalog.Catalog;
2525
import org.apache.iceberg.hadoop.HadoopCatalog;
26-
import org.apache.polaris.core.catalog.NonRESTCatalogFactory;
26+
import org.apache.polaris.core.catalog.ExternalCatalogFactory;
2727
import org.apache.polaris.core.connection.AuthenticationParametersDpo;
2828
import org.apache.polaris.core.connection.AuthenticationType;
2929
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
@@ -35,7 +35,7 @@
3535
/** Factory class for creating a Hadoop catalog handle based on connection configuration. */
3636
@ApplicationScoped
3737
@Identifier("hadoop")
38-
public class HadoopFederatedCatalogFactory implements NonRESTCatalogFactory {
38+
public class HadoopFederatedCatalogFactory implements ExternalCatalogFactory {
3939
private static final Logger LOGGER = LoggerFactory.getLogger(HadoopFederatedCatalogFactory.class);
4040

4141
@Override

polaris-core/src/main/java/org/apache/polaris/core/catalog/NonRESTCatalogFactory.java renamed to polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,15 @@
2323
import org.apache.polaris.core.secrets.UserSecretsManager;
2424

2525
/**
26-
* Factory interface for creating non-REST catalog handles based on connection configuration.
26+
* Factory interface for creating external catalog handles based on connection configuration.
2727
*
2828
* <p>Implementations should be annotated with CDI annotations and use the @Identifier annotation to
2929
* specify which connection type they support.
3030
*/
31-
public interface NonRESTCatalogFactory {
31+
public interface ExternalCatalogFactory {
3232

3333
/**
34-
* Creates a catalog hadnle for the given non-REST connection configuration.
34+
* Creates a catalog handle for the given connection configuration.
3535
*
3636
* @param connectionConfig the connection configuration
3737
* @param userSecretsManager the user secrets manager for handling credentials

runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
import org.apache.iceberg.rest.responses.LoadTableResponse;
6464
import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
6565
import org.apache.polaris.core.auth.PolarisAuthorizer;
66-
import org.apache.polaris.core.catalog.NonRESTCatalogFactory;
66+
import org.apache.polaris.core.catalog.ExternalCatalogFactory;
6767
import org.apache.polaris.core.context.CallContext;
6868
import org.apache.polaris.core.context.RealmContext;
6969
import org.apache.polaris.core.entity.PolarisEntity;
@@ -147,7 +147,7 @@ public class IcebergCatalogAdapter
147147
private final CatalogPrefixParser prefixParser;
148148
private final ReservedProperties reservedProperties;
149149
private final CatalogHandlerUtils catalogHandlerUtils;
150-
private final Instance<NonRESTCatalogFactory> nonRESTCatalogFactories;
150+
private final Instance<ExternalCatalogFactory> externalCatalogFactories;
151151

152152
@Inject
153153
public IcebergCatalogAdapter(
@@ -162,7 +162,7 @@ public IcebergCatalogAdapter(
162162
CatalogPrefixParser prefixParser,
163163
ReservedProperties reservedProperties,
164164
CatalogHandlerUtils catalogHandlerUtils,
165-
@Any Instance<NonRESTCatalogFactory> nonRESTCatalogFactories) {
165+
@Any Instance<ExternalCatalogFactory> externalCatalogFactories) {
166166
this.realmContext = realmContext;
167167
this.callContext = callContext;
168168
this.catalogFactory = catalogFactory;
@@ -174,7 +174,7 @@ public IcebergCatalogAdapter(
174174
this.prefixParser = prefixParser;
175175
this.reservedProperties = reservedProperties;
176176
this.catalogHandlerUtils = catalogHandlerUtils;
177-
this.nonRESTCatalogFactories = nonRESTCatalogFactories;
177+
this.externalCatalogFactories = externalCatalogFactories;
178178
}
179179

180180
/**
@@ -212,7 +212,7 @@ IcebergCatalogHandler newHandlerWrapper(SecurityContext securityContext, String
212212
polarisAuthorizer,
213213
reservedProperties,
214214
catalogHandlerUtils,
215-
nonRESTCatalogFactories);
215+
externalCatalogFactories);
216216
}
217217

218218
@Override

runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java

Lines changed: 21 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.apache.iceberg.UpdateRequirement;
4848
import org.apache.iceberg.catalog.Catalog;
4949
import org.apache.iceberg.catalog.Namespace;
50-
import org.apache.iceberg.catalog.SessionCatalog;
5150
import org.apache.iceberg.catalog.SupportsNamespaces;
5251
import org.apache.iceberg.catalog.TableIdentifier;
5352
import org.apache.iceberg.catalog.ViewCatalog;
@@ -56,8 +55,6 @@
5655
import org.apache.iceberg.exceptions.CommitFailedException;
5756
import org.apache.iceberg.exceptions.ForbiddenException;
5857
import org.apache.iceberg.exceptions.NoSuchTableException;
59-
import org.apache.iceberg.rest.HTTPClient;
60-
import org.apache.iceberg.rest.RESTCatalog;
6158
import org.apache.iceberg.rest.credentials.ImmutableCredential;
6259
import org.apache.iceberg.rest.requests.CommitTransactionRequest;
6360
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
@@ -76,11 +73,10 @@
7673
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
7774
import org.apache.polaris.core.auth.PolarisAuthorizableOperation;
7875
import org.apache.polaris.core.auth.PolarisAuthorizer;
79-
import org.apache.polaris.core.catalog.NonRESTCatalogFactory;
76+
import org.apache.polaris.core.catalog.ExternalCatalogFactory;
8077
import org.apache.polaris.core.config.FeatureConfiguration;
8178
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
8279
import org.apache.polaris.core.connection.ConnectionType;
83-
import org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo;
8480
import org.apache.polaris.core.context.CallContext;
8581
import org.apache.polaris.core.entity.CatalogEntity;
8682
import org.apache.polaris.core.entity.PolarisEntitySubType;
@@ -130,7 +126,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab
130126
private final ReservedProperties reservedProperties;
131127
private final CatalogHandlerUtils catalogHandlerUtils;
132128

133-
private final Instance<NonRESTCatalogFactory> nonRESTCatalogFactories;
129+
private final Instance<ExternalCatalogFactory> externalCatalogFactories;
134130

135131
// Catalog instance will be initialized after authorizing resolver successfully resolves
136132
// the catalog entity.
@@ -152,14 +148,14 @@ public IcebergCatalogHandler(
152148
PolarisAuthorizer authorizer,
153149
ReservedProperties reservedProperties,
154150
CatalogHandlerUtils catalogHandlerUtils,
155-
Instance<NonRESTCatalogFactory> nonRESTCatalogFactories) {
151+
Instance<ExternalCatalogFactory> externalCatalogFactories) {
156152
super(callContext, resolutionManifestFactory, securityContext, catalogName, authorizer);
157153
this.metaStoreManager = metaStoreManager;
158154
this.userSecretsManager = userSecretsManager;
159155
this.catalogFactory = catalogFactory;
160156
this.reservedProperties = reservedProperties;
161157
this.catalogHandlerUtils = catalogHandlerUtils;
162-
this.nonRESTCatalogFactories = nonRESTCatalogFactories;
158+
this.externalCatalogFactories = externalCatalogFactories;
163159
}
164160

165161
/**
@@ -221,34 +217,31 @@ protected void initializeCatalog() {
221217
Catalog federatedCatalog;
222218
ConnectionType connectionType =
223219
ConnectionType.fromCode(connectionConfigInfoDpo.getConnectionTypeCode());
220+
221+
// Use the unified factory pattern for all external catalog types
222+
String factoryIdentifier;
224223
switch (connectionType) {
225224
case ICEBERG_REST:
226-
SessionCatalog.SessionContext context = SessionCatalog.SessionContext.createEmpty();
227-
federatedCatalog =
228-
new RESTCatalog(
229-
context,
230-
(config) ->
231-
HTTPClient.builder(config)
232-
.uri(config.get(org.apache.iceberg.CatalogProperties.URI))
233-
.build());
234-
federatedCatalog.initialize(
235-
((IcebergRestConnectionConfigInfoDpo) connectionConfigInfoDpo).getRemoteCatalogName(),
236-
connectionConfigInfoDpo.asIcebergCatalogProperties(getUserSecretsManager()));
225+
factoryIdentifier = "iceberg-rest";
237226
break;
238227
case HADOOP:
239-
// Use CDI to select the Hadoop federation factory at runtime
240-
Instance<NonRESTCatalogFactory> hadoopFactory =
241-
nonRESTCatalogFactories.select(Identifier.Literal.of("hadoop"));
242-
if (!hadoopFactory.isUnsatisfied()) {
243-
federatedCatalog =
244-
hadoopFactory.get().createCatalog(connectionConfigInfoDpo, getUserSecretsManager());
245-
} else {
246-
throw new UnsupportedOperationException("Hadoop federation factory unavailable.");
247-
}
228+
factoryIdentifier = "hadoop";
248229
break;
249230
default:
250231
throw new UnsupportedOperationException("Unsupported connection type: " + connectionType);
251232
}
233+
234+
Instance<ExternalCatalogFactory> externalCatalogFactory =
235+
externalCatalogFactories.select(Identifier.Literal.of(factoryIdentifier));
236+
if (!externalCatalogFactory.isUnsatisfied()) {
237+
federatedCatalog =
238+
externalCatalogFactory
239+
.get()
240+
.createCatalog(connectionConfigInfoDpo, getUserSecretsManager());
241+
} else {
242+
throw new UnsupportedOperationException(
243+
"External catalog factory for type '" + connectionType + "' is unavailable.");
244+
}
252245
this.baseCatalog = federatedCatalog;
253246
} else {
254247
LOGGER.atInfo().log("Initializing non-federated catalog");
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.service.catalog.iceberg;
20+
21+
import io.smallrye.common.annotation.Identifier;
22+
import jakarta.enterprise.context.ApplicationScoped;
23+
import org.apache.iceberg.catalog.Catalog;
24+
import org.apache.iceberg.catalog.SessionCatalog;
25+
import org.apache.iceberg.rest.HTTPClient;
26+
import org.apache.iceberg.rest.RESTCatalog;
27+
import org.apache.polaris.core.catalog.ExternalCatalogFactory;
28+
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
29+
import org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo;
30+
import org.apache.polaris.core.secrets.UserSecretsManager;
31+
32+
/** Factory class for creating an Iceberg REST catalog handle based on connection configuration. */
33+
@ApplicationScoped
34+
@Identifier("iceberg-rest")
35+
public class IcebergRESTExternalCatalogFactory implements ExternalCatalogFactory {
36+
37+
@Override
38+
public Catalog createCatalog(
39+
ConnectionConfigInfoDpo connectionConfig, UserSecretsManager userSecretsManager) {
40+
if (!(connectionConfig instanceof IcebergRestConnectionConfigInfoDpo)) {
41+
throw new IllegalArgumentException(
42+
"Expected IcebergRestConnectionConfigInfoDpo but got: "
43+
+ connectionConfig.getClass().getSimpleName());
44+
}
45+
46+
IcebergRestConnectionConfigInfoDpo icebergConfig =
47+
(IcebergRestConnectionConfigInfoDpo) connectionConfig;
48+
49+
SessionCatalog.SessionContext context = SessionCatalog.SessionContext.createEmpty();
50+
RESTCatalog federatedCatalog =
51+
new RESTCatalog(
52+
context,
53+
(config) ->
54+
HTTPClient.builder(config)
55+
.uri(config.get(org.apache.iceberg.CatalogProperties.URI))
56+
.build());
57+
58+
federatedCatalog.initialize(
59+
icebergConfig.getRemoteCatalogName(),
60+
connectionConfig.asIcebergCatalogProperties(userSecretsManager));
61+
62+
return federatedCatalog;
63+
}
64+
}

runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
import org.apache.polaris.core.admin.model.PrincipalWithCredentialsCredentials;
5757
import org.apache.polaris.core.admin.model.StorageConfigInfo;
5858
import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
59-
import org.apache.polaris.core.catalog.NonRESTCatalogFactory;
59+
import org.apache.polaris.core.catalog.ExternalCatalogFactory;
6060
import org.apache.polaris.core.context.CallContext;
6161
import org.apache.polaris.core.entity.CatalogEntity;
6262
import org.apache.polaris.core.entity.CatalogRoleEntity;
@@ -82,8 +82,8 @@
8282
public class IcebergCatalogHandlerAuthzTest extends PolarisAuthzTestBase {
8383

8484
@SuppressWarnings("unchecked")
85-
private static Instance<NonRESTCatalogFactory> emptyNonRESTCatalogFactory() {
86-
Instance<NonRESTCatalogFactory> mock = Mockito.mock(Instance.class);
85+
private static Instance<ExternalCatalogFactory> emptyExternalCatalogFactory() {
86+
Instance<ExternalCatalogFactory> mock = Mockito.mock(Instance.class);
8787
Mockito.when(mock.select(Mockito.any())).thenReturn(mock);
8888
Mockito.when(mock.isUnsatisfied()).thenReturn(true);
8989
return mock;
@@ -112,7 +112,7 @@ private IcebergCatalogHandler newWrapper(
112112
polarisAuthorizer,
113113
reservedProperties,
114114
catalogHandlerUtils,
115-
emptyNonRESTCatalogFactory());
115+
emptyExternalCatalogFactory());
116116
}
117117

118118
/**
@@ -254,7 +254,7 @@ public void testInsufficientPermissionsPriorToSecretRotation() {
254254
polarisAuthorizer,
255255
reservedProperties,
256256
catalogHandlerUtils,
257-
emptyNonRESTCatalogFactory());
257+
emptyExternalCatalogFactory());
258258

259259
// a variety of actions are all disallowed because the principal's credentials must be rotated
260260
doTestInsufficientPrivileges(
@@ -290,7 +290,7 @@ public void testInsufficientPermissionsPriorToSecretRotation() {
290290
polarisAuthorizer,
291291
reservedProperties,
292292
catalogHandlerUtils,
293-
emptyNonRESTCatalogFactory());
293+
emptyExternalCatalogFactory());
294294

295295
doTestSufficientPrivilegeSets(
296296
List.of(Set.of(PolarisPrivilege.NAMESPACE_LIST)),

runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import org.apache.polaris.core.PolarisDiagnostics;
3838
import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
3939
import org.apache.polaris.core.auth.PolarisAuthorizer;
40-
import org.apache.polaris.core.catalog.NonRESTCatalogFactory;
40+
import org.apache.polaris.core.catalog.ExternalCatalogFactory;
4141
import org.apache.polaris.core.config.PolarisConfigurationStore;
4242
import org.apache.polaris.core.context.CallContext;
4343
import org.apache.polaris.core.context.RealmContext;
@@ -217,9 +217,10 @@ public TestServices build() {
217217
CatalogHandlerUtils catalogHandlerUtils =
218218
new CatalogHandlerUtils(callContext.getRealmConfig());
219219

220-
Instance<NonRESTCatalogFactory> nonRESTCatalogFactory = Mockito.mock(Instance.class);
221-
Mockito.when(nonRESTCatalogFactory.select(Mockito.any())).thenReturn(nonRESTCatalogFactory);
222-
Mockito.when(nonRESTCatalogFactory.isUnsatisfied()).thenReturn(true);
220+
@SuppressWarnings("unchecked")
221+
Instance<ExternalCatalogFactory> externalCatalogFactory = Mockito.mock(Instance.class);
222+
Mockito.when(externalCatalogFactory.select(Mockito.any())).thenReturn(externalCatalogFactory);
223+
Mockito.when(externalCatalogFactory.isUnsatisfied()).thenReturn(true);
223224

224225
IcebergCatalogAdapter catalogService =
225226
new IcebergCatalogAdapter(
@@ -234,7 +235,7 @@ public TestServices build() {
234235
new DefaultCatalogPrefixParser(),
235236
reservedProperties,
236237
catalogHandlerUtils,
237-
nonRESTCatalogFactory);
238+
externalCatalogFactory);
238239

239240
IcebergRestCatalogApi restApi = new IcebergRestCatalogApi(catalogService);
240241
IcebergRestConfigurationApi restConfigurationApi =

0 commit comments

Comments
 (0)