From 0563e937ca47b4a37fdd7eb8602666251d1dd343 Mon Sep 17 00:00:00 2001 From: Pooja Nilangekar Date: Tue, 12 Aug 2025 13:46:20 -0700 Subject: [PATCH 1/6] Modularize federation (Option 2) --- extensions/federation/hadoop/build.gradle.kts | 57 ++++++++++++++++ .../hadoop/HadoopFederatedCatalogFactory.java | 61 +++++++++++++++++ gradle/projects.main.properties | 1 + .../core/catalog/ExternalCatalogFactory.java | 43 ++++++++++++ .../core/connection/ConnectionType.java | 21 ++++++ runtime/service/build.gradle.kts | 1 + .../iceberg/IcebergCatalogAdapter.java | 11 +++- .../iceberg/IcebergCatalogHandler.java | 65 ++++++------------- .../IcebergRESTExternalCatalogFactory.java | 65 +++++++++++++++++++ .../IcebergCatalogHandlerAuthzTest.java | 19 +++++- .../apache/polaris/service/TestServices.java | 10 ++- 11 files changed, 303 insertions(+), 51 deletions(-) create mode 100644 extensions/federation/hadoop/build.gradle.kts create mode 100644 extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java create mode 100644 polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java create mode 100644 runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java diff --git a/extensions/federation/hadoop/build.gradle.kts b/extensions/federation/hadoop/build.gradle.kts new file mode 100644 index 0000000000..431da94e52 --- /dev/null +++ b/extensions/federation/hadoop/build.gradle.kts @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("polaris-client") + alias(libs.plugins.jandex) +} + +dependencies { + // Polaris dependencies + implementation(project(":polaris-core")) + + implementation(platform(libs.iceberg.bom)) + implementation("org.apache.iceberg:iceberg-api") + implementation("org.apache.iceberg:iceberg-core") + implementation("org.apache.iceberg:iceberg-common") + + // Hadoop dependencies (for Hadoop catalog support) + implementation(libs.hadoop.common) { + exclude("org.slf4j", "slf4j-reload4j") + exclude("org.slf4j", "slf4j-log4j12") + exclude("ch.qos.reload4j", "reload4j") + exclude("log4j", "log4j") + exclude("org.apache.zookeeper", "zookeeper") + exclude("org.apache.hadoop.thirdparty", "hadoop-shaded-protobuf_3_25") + exclude("com.github.pjfanning", "jersey-json") + exclude("com.sun.jersey", "jersey-core") + exclude("com.sun.jersey", "jersey-server") + exclude("com.sun.jersey", "jersey-servlet") + exclude("io.dropwizard.metrics", "metrics-core") + } + implementation(libs.hadoop.client.api) + implementation(libs.hadoop.client.runtime) + + // CDI dependencies for runtime discovery + implementation(libs.jakarta.enterprise.cdi.api) + implementation(libs.smallrye.common.annotation) + + // Logging + implementation(libs.slf4j.api) +} diff --git a/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java b/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java new file mode 100644 index 0000000000..50294da99c --- /dev/null +++ b/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extensions.federation.hadoop; + +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.polaris.core.catalog.ExternalCatalogFactory; +import org.apache.polaris.core.connection.AuthenticationParametersDpo; +import org.apache.polaris.core.connection.AuthenticationType; +import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; +import org.apache.polaris.core.connection.ConnectionType; +import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo; +import org.apache.polaris.core.secrets.UserSecretsManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Factory class for creating a Hadoop catalog handle based on connection configuration. */ +@ApplicationScoped +@Identifier(ConnectionType.HADOOP_FACTORY_IDENTIFIER) +public class HadoopFederatedCatalogFactory implements ExternalCatalogFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(HadoopFederatedCatalogFactory.class); + + @Override + public Catalog createCatalog( + ConnectionConfigInfoDpo connectionConfigInfoDpo, UserSecretsManager userSecretsManager) { + // Currently, Polaris supports Hadoop federation only via IMPLICIT authentication. + // Hence, prior to initializing the configuration, ensure that the catalog uses + // IMPLICIT authentication. + AuthenticationParametersDpo authenticationParametersDpo = + connectionConfigInfoDpo.getAuthenticationParameters(); + if (authenticationParametersDpo.getAuthenticationTypeCode() + != AuthenticationType.IMPLICIT.getCode()) { + throw new IllegalStateException("Hadoop federation only supports IMPLICIT authentication."); + } + Configuration conf = new Configuration(); + String warehouse = ((HadoopConnectionConfigInfoDpo) connectionConfigInfoDpo).getWarehouse(); + HadoopCatalog hadoopCatalog = new HadoopCatalog(conf, warehouse); + hadoopCatalog.initialize( + warehouse, connectionConfigInfoDpo.asIcebergCatalogProperties(userSecretsManager)); + return hadoopCatalog; + } +} diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties index 39ab227411..1b74232b59 100644 --- a/gradle/projects.main.properties +++ b/gradle/projects.main.properties @@ -42,6 +42,7 @@ polaris-minio-testcontainer=tools/minio-testcontainer polaris-version=tools/version polaris-misc-types=tools/misc-types polaris-persistence-varint=nosql/persistence/varint +polaris-extensions-federation-hadoop=extensions/federation/hadoop polaris-config-docs-annotations=tools/config-docs/annotations polaris-config-docs-generator=tools/config-docs/generator diff --git a/polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java b/polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java new file mode 100644 index 0000000000..59c8903753 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.catalog; + +import org.apache.iceberg.catalog.Catalog; +import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; +import org.apache.polaris.core.secrets.UserSecretsManager; + +/** + * Factory interface for creating external catalog handles based on connection configuration. + * + *

Implementations should be annotated with CDI annotations and use the @Identifier annotation to + * specify which connection type they support. + */ +public interface ExternalCatalogFactory { + + /** + * Creates a catalog handle for the given connection configuration. + * + * @param connectionConfig the connection configuration + * @param userSecretsManager the user secrets manager for handling credentials + * @return the initialized catalog + * @throws IllegalStateException if the connection configuration is invalid + */ + Catalog createCatalog( + ConnectionConfigInfoDpo connectionConfig, UserSecretsManager userSecretsManager); +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java b/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java index 441c0c4c53..2e09366a31 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java @@ -35,6 +35,9 @@ public enum ConnectionType { HADOOP(2), ; + public static final String ICEBERG_REST_FACTORY_IDENTIFIER = "ICEBERG_REST"; + public static final String HADOOP_FACTORY_IDENTIFIER = "HADOOP"; + private static final ConnectionType[] REVERSE_MAPPING_ARRAY; static { @@ -77,4 +80,22 @@ public enum ConnectionType { public int getCode() { return this.code; } + + /** + * Get the factory identifier string used for CDI injection of the appropriate + * ExternalCatalogFactory. + * + * @return the factory identifier string + */ + public String getFactoryIdentifier() { + switch (this) { + case ICEBERG_REST: + return ICEBERG_REST_FACTORY_IDENTIFIER; + case HADOOP: + return HADOOP_FACTORY_IDENTIFIER; + default: + throw new UnsupportedOperationException( + "No factory identifier for connection type: " + this); + } + } } diff --git a/runtime/service/build.gradle.kts b/runtime/service/build.gradle.kts index 6a3cc092b9..a121f15ee3 100644 --- a/runtime/service/build.gradle.kts +++ b/runtime/service/build.gradle.kts @@ -30,6 +30,7 @@ dependencies { implementation(project(":polaris-api-management-service")) implementation(project(":polaris-api-iceberg-service")) implementation(project(":polaris-api-catalog-service")) + runtimeOnly(project(":polaris-extensions-federation-hadoop")) runtimeOnly(project(":polaris-relational-jdbc")) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java index 99aea64d9b..ac079e71d9 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java @@ -27,6 +27,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import jakarta.enterprise.context.RequestScoped; +import jakarta.enterprise.inject.Any; +import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; import jakarta.ws.rs.WebApplicationException; import jakarta.ws.rs.core.HttpHeaders; @@ -61,6 +63,7 @@ import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal; import org.apache.polaris.core.auth.PolarisAuthorizer; +import org.apache.polaris.core.catalog.ExternalCatalogFactory; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.entity.PolarisEntity; @@ -144,6 +147,7 @@ public class IcebergCatalogAdapter private final CatalogPrefixParser prefixParser; private final ReservedProperties reservedProperties; private final CatalogHandlerUtils catalogHandlerUtils; + private final Instance externalCatalogFactories; @Inject public IcebergCatalogAdapter( @@ -157,7 +161,8 @@ public IcebergCatalogAdapter( PolarisAuthorizer polarisAuthorizer, CatalogPrefixParser prefixParser, ReservedProperties reservedProperties, - CatalogHandlerUtils catalogHandlerUtils) { + CatalogHandlerUtils catalogHandlerUtils, + @Any Instance externalCatalogFactories) { this.realmContext = realmContext; this.callContext = callContext; this.catalogFactory = catalogFactory; @@ -169,6 +174,7 @@ public IcebergCatalogAdapter( this.prefixParser = prefixParser; this.reservedProperties = reservedProperties; this.catalogHandlerUtils = catalogHandlerUtils; + this.externalCatalogFactories = externalCatalogFactories; } /** @@ -205,7 +211,8 @@ IcebergCatalogHandler newHandlerWrapper(SecurityContext securityContext, String catalogName, polarisAuthorizer, reservedProperties, - catalogHandlerUtils); + catalogHandlerUtils, + externalCatalogFactories); } @Override diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index 4395467d1d..e771cbc16b 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -20,7 +20,9 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import io.smallrye.common.annotation.Identifier; import jakarta.annotation.Nonnull; +import jakarta.enterprise.inject.Instance; import jakarta.ws.rs.core.SecurityContext; import java.io.Closeable; import java.time.OffsetDateTime; @@ -46,7 +48,6 @@ import org.apache.iceberg.UpdateRequirement; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.SessionCatalog; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.catalog.ViewCatalog; @@ -55,9 +56,6 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.hadoop.HadoopCatalog; -import org.apache.iceberg.rest.HTTPClient; -import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.rest.credentials.ImmutableCredential; import org.apache.iceberg.rest.requests.CommitTransactionRequest; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; @@ -76,13 +74,10 @@ import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; import org.apache.polaris.core.auth.PolarisAuthorizableOperation; import org.apache.polaris.core.auth.PolarisAuthorizer; +import org.apache.polaris.core.catalog.ExternalCatalogFactory; import org.apache.polaris.core.config.FeatureConfiguration; -import org.apache.polaris.core.connection.AuthenticationParametersDpo; -import org.apache.polaris.core.connection.AuthenticationType; import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; import org.apache.polaris.core.connection.ConnectionType; -import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo; -import org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.entity.CatalogEntity; import org.apache.polaris.core.entity.PolarisEntitySubType; @@ -132,6 +127,8 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab private final ReservedProperties reservedProperties; private final CatalogHandlerUtils catalogHandlerUtils; + private final Instance externalCatalogFactories; + // Catalog instance will be initialized after authorizing resolver successfully resolves // the catalog entity. protected Catalog baseCatalog = null; @@ -151,13 +148,15 @@ public IcebergCatalogHandler( String catalogName, PolarisAuthorizer authorizer, ReservedProperties reservedProperties, - CatalogHandlerUtils catalogHandlerUtils) { + CatalogHandlerUtils catalogHandlerUtils, + Instance externalCatalogFactories) { super(callContext, resolutionManifestFactory, securityContext, catalogName, authorizer); this.metaStoreManager = metaStoreManager; this.userSecretsManager = userSecretsManager; this.catalogFactory = catalogFactory; this.reservedProperties = reservedProperties; this.catalogHandlerUtils = catalogHandlerUtils; + this.externalCatalogFactories = externalCatalogFactories; } /** @@ -220,42 +219,18 @@ protected void initializeCatalog() { ConnectionType connectionType = ConnectionType.fromCode(connectionConfigInfoDpo.getConnectionTypeCode()); - switch (connectionType) { - case ICEBERG_REST: - SessionCatalog.SessionContext context = SessionCatalog.SessionContext.createEmpty(); - federatedCatalog = - new RESTCatalog( - context, - (config) -> - HTTPClient.builder(config) - .uri(config.get(org.apache.iceberg.CatalogProperties.URI)) - .build()); - federatedCatalog.initialize( - ((IcebergRestConnectionConfigInfoDpo) connectionConfigInfoDpo).getRemoteCatalogName(), - connectionConfigInfoDpo.asIcebergCatalogProperties(getUserSecretsManager())); - break; - case HADOOP: - // Currently, Polaris supports Hadoop federation only via IMPLICIT authentication. - // Hence, prior to initializing the configuration, ensure that the catalog uses - // IMPLICIT authentication. - AuthenticationParametersDpo authenticationParametersDpo = - connectionConfigInfoDpo.getAuthenticationParameters(); - if (authenticationParametersDpo.getAuthenticationTypeCode() - != AuthenticationType.IMPLICIT.getCode()) { - throw new IllegalStateException( - "Hadoop federation only supports IMPLICIT authentication."); - } - Configuration conf = new Configuration(); - String warehouse = - ((HadoopConnectionConfigInfoDpo) connectionConfigInfoDpo).getWarehouse(); - federatedCatalog = new HadoopCatalog(conf, warehouse); - federatedCatalog.initialize( - warehouse, - connectionConfigInfoDpo.asIcebergCatalogProperties(getUserSecretsManager())); - break; - default: - throw new UnsupportedOperationException( - "Connection type not supported: " + connectionType); + // Use the unified factory pattern for all external catalog types + Instance externalCatalogFactory = + externalCatalogFactories.select( + Identifier.Literal.of(connectionType.getFactoryIdentifier())); + if (!externalCatalogFactory.isUnsatisfied()) { + federatedCatalog = + externalCatalogFactory + .get() + .createCatalog(connectionConfigInfoDpo, getUserSecretsManager()); + } else { + throw new UnsupportedOperationException( + "External catalog factory for type '" + connectionType + "' is unavailable."); } this.baseCatalog = federatedCatalog; } else { diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java new file mode 100644 index 0000000000..05de201c37 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.catalog.iceberg; + +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.rest.HTTPClient; +import org.apache.iceberg.rest.RESTCatalog; +import org.apache.polaris.core.catalog.ExternalCatalogFactory; +import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; +import org.apache.polaris.core.connection.ConnectionType; +import org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo; +import org.apache.polaris.core.secrets.UserSecretsManager; + +/** Factory class for creating an Iceberg REST catalog handle based on connection configuration. */ +@ApplicationScoped +@Identifier(ConnectionType.ICEBERG_REST_FACTORY_IDENTIFIER) +public class IcebergRESTExternalCatalogFactory implements ExternalCatalogFactory { + + @Override + public Catalog createCatalog( + ConnectionConfigInfoDpo connectionConfig, UserSecretsManager userSecretsManager) { + if (!(connectionConfig instanceof IcebergRestConnectionConfigInfoDpo)) { + throw new IllegalArgumentException( + "Expected IcebergRestConnectionConfigInfoDpo but got: " + + connectionConfig.getClass().getSimpleName()); + } + + IcebergRestConnectionConfigInfoDpo icebergConfig = + (IcebergRestConnectionConfigInfoDpo) connectionConfig; + + SessionCatalog.SessionContext context = SessionCatalog.SessionContext.createEmpty(); + RESTCatalog federatedCatalog = + new RESTCatalog( + context, + (config) -> + HTTPClient.builder(config) + .uri(config.get(org.apache.iceberg.CatalogProperties.URI)) + .build()); + + federatedCatalog.initialize( + icebergConfig.getRemoteCatalogName(), + connectionConfig.asIcebergCatalogProperties(userSecretsManager)); + + return federatedCatalog; + } +} diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java index ea963784b3..90d7341a95 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.TestProfile; +import jakarta.enterprise.inject.Instance; import jakarta.ws.rs.core.SecurityContext; import java.time.Instant; import java.util.List; @@ -55,6 +56,7 @@ import org.apache.polaris.core.admin.model.PrincipalWithCredentialsCredentials; import org.apache.polaris.core.admin.model.StorageConfigInfo; import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal; +import org.apache.polaris.core.catalog.ExternalCatalogFactory; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.entity.CatalogEntity; import org.apache.polaris.core.entity.CatalogRoleEntity; @@ -79,6 +81,14 @@ @TestProfile(PolarisAuthzTestBase.Profile.class) public class IcebergCatalogHandlerAuthzTest extends PolarisAuthzTestBase { + @SuppressWarnings("unchecked") + private static Instance emptyExternalCatalogFactory() { + Instance mock = Mockito.mock(Instance.class); + Mockito.when(mock.select(Mockito.any())).thenReturn(mock); + Mockito.when(mock.isUnsatisfied()).thenReturn(true); + return mock; + } + private IcebergCatalogHandler newWrapper() { return newWrapper(Set.of()); } @@ -101,7 +111,8 @@ private IcebergCatalogHandler newWrapper( catalogName, polarisAuthorizer, reservedProperties, - catalogHandlerUtils); + catalogHandlerUtils, + emptyExternalCatalogFactory()); } /** @@ -242,7 +253,8 @@ public void testInsufficientPermissionsPriorToSecretRotation() { CATALOG_NAME, polarisAuthorizer, reservedProperties, - catalogHandlerUtils); + catalogHandlerUtils, + emptyExternalCatalogFactory()); // a variety of actions are all disallowed because the principal's credentials must be rotated doTestInsufficientPrivileges( @@ -277,7 +289,8 @@ public void testInsufficientPermissionsPriorToSecretRotation() { CATALOG_NAME, polarisAuthorizer, reservedProperties, - catalogHandlerUtils); + catalogHandlerUtils, + emptyExternalCatalogFactory()); doTestSufficientPrivilegeSets( List.of(Set.of(PolarisPrivilege.NAMESPACE_LIST)), diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java index d8ec777889..e7288c441e 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -22,6 +22,7 @@ import com.google.auth.oauth2.GoogleCredentials; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; +import jakarta.enterprise.inject.Instance; import jakarta.ws.rs.core.SecurityContext; import java.security.Principal; import java.time.Clock; @@ -36,6 +37,7 @@ import org.apache.polaris.core.PolarisDiagnostics; import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal; import org.apache.polaris.core.auth.PolarisAuthorizer; +import org.apache.polaris.core.catalog.ExternalCatalogFactory; import org.apache.polaris.core.config.PolarisConfigurationStore; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.context.RealmContext; @@ -215,6 +217,11 @@ public TestServices build() { CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils(callContext.getRealmConfig()); + @SuppressWarnings("unchecked") + Instance externalCatalogFactory = Mockito.mock(Instance.class); + Mockito.when(externalCatalogFactory.select(Mockito.any())).thenReturn(externalCatalogFactory); + Mockito.when(externalCatalogFactory.isUnsatisfied()).thenReturn(true); + IcebergCatalogAdapter catalogService = new IcebergCatalogAdapter( realmContext, @@ -227,7 +234,8 @@ public TestServices build() { authorizer, new DefaultCatalogPrefixParser(), reservedProperties, - catalogHandlerUtils); + catalogHandlerUtils, + externalCatalogFactory); IcebergRestCatalogApi restApi = new IcebergRestCatalogApi(catalogService); IcebergRestConfigurationApi restConfigurationApi = From ba56f270c87beab50b5fbab7392c199009756fff Mon Sep 17 00:00:00 2001 From: Pooja Nilangekar Date: Tue, 12 Aug 2025 13:54:08 -0700 Subject: [PATCH 2/6] Move polaris-extensions-federation-hadoop dependency --- runtime/server/build.gradle.kts | 1 + runtime/service/build.gradle.kts | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/server/build.gradle.kts b/runtime/server/build.gradle.kts index 2ebd153812..c645e0bc0a 100644 --- a/runtime/server/build.gradle.kts +++ b/runtime/server/build.gradle.kts @@ -48,6 +48,7 @@ dependencies { runtimeOnly("org.postgresql:postgresql") runtimeOnly(project(":polaris-relational-jdbc")) runtimeOnly("io.quarkus:quarkus-jdbc-postgresql") + runtimeOnly(project(":polaris-extensions-federation-hadoop")) // enforce the Quarkus _platform_ here, to get a consistent and validated set of dependencies implementation(enforcedPlatform(libs.quarkus.bom)) diff --git a/runtime/service/build.gradle.kts b/runtime/service/build.gradle.kts index a121f15ee3..6a3cc092b9 100644 --- a/runtime/service/build.gradle.kts +++ b/runtime/service/build.gradle.kts @@ -30,7 +30,6 @@ dependencies { implementation(project(":polaris-api-management-service")) implementation(project(":polaris-api-iceberg-service")) implementation(project(":polaris-api-catalog-service")) - runtimeOnly(project(":polaris-extensions-federation-hadoop")) runtimeOnly(project(":polaris-relational-jdbc")) From a57961912e235ee5c295af42d2485293ec4a404f Mon Sep 17 00:00:00 2001 From: Pooja Nilangekar Date: Tue, 12 Aug 2025 14:05:52 -0700 Subject: [PATCH 3/6] spotlessApply --- .../polaris/service/catalog/iceberg/IcebergCatalogHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index e771cbc16b..af0c4dcfb3 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -35,7 +35,6 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.BaseMetadataTable; import org.apache.iceberg.BaseTable; import org.apache.iceberg.MetadataUpdate; From d38ef56c014ba339dd5da2ef2e3941edbd54c19a Mon Sep 17 00:00:00 2001 From: Pooja Nilangekar Date: Tue, 12 Aug 2025 14:13:49 -0700 Subject: [PATCH 4/6] Remove double negative in external factory resolution check --- .../polaris/service/catalog/iceberg/IcebergCatalogHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index af0c4dcfb3..266ac11e49 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -222,7 +222,7 @@ protected void initializeCatalog() { Instance externalCatalogFactory = externalCatalogFactories.select( Identifier.Literal.of(connectionType.getFactoryIdentifier())); - if (!externalCatalogFactory.isUnsatisfied()) { + if (externalCatalogFactory.isResolvable()) { federatedCatalog = externalCatalogFactory .get() From 29bb8bf84ebff3cc1fcae0a92d4ec34f1d1e54f6 Mon Sep 17 00:00:00 2001 From: Pooja Nilangekar Date: Thu, 14 Aug 2025 10:13:51 -0700 Subject: [PATCH 5/6] Change identifier to lowerCase --- .../org/apache/polaris/core/connection/ConnectionType.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java b/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java index 2e09366a31..458073ed6e 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java @@ -20,6 +20,7 @@ import jakarta.annotation.Nonnull; import java.util.Arrays; +import java.util.Locale; /** * The internal persistence-object counterpart to ConnectionConfigInfo.ConnectionTypeEnum defined in @@ -35,8 +36,8 @@ public enum ConnectionType { HADOOP(2), ; - public static final String ICEBERG_REST_FACTORY_IDENTIFIER = "ICEBERG_REST"; - public static final String HADOOP_FACTORY_IDENTIFIER = "HADOOP"; + public static final String ICEBERG_REST_FACTORY_IDENTIFIER = ICEBERG_REST.name().toLowerCase(Locale.ROOT); + public static final String HADOOP_FACTORY_IDENTIFIER = HADOOP.name().toLowerCase(Locale.ROOT); private static final ConnectionType[] REVERSE_MAPPING_ARRAY; From d1537de872f0a2cbfa7bd2ce3d5690bd7a67f562 Mon Sep 17 00:00:00 2001 From: Pooja Nilangekar Date: Thu, 14 Aug 2025 10:36:00 -0700 Subject: [PATCH 6/6] Change identifiers to constants --- .../org/apache/polaris/core/connection/ConnectionType.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java b/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java index 458073ed6e..7c5092c431 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java @@ -20,7 +20,6 @@ import jakarta.annotation.Nonnull; import java.util.Arrays; -import java.util.Locale; /** * The internal persistence-object counterpart to ConnectionConfigInfo.ConnectionTypeEnum defined in @@ -36,8 +35,8 @@ public enum ConnectionType { HADOOP(2), ; - public static final String ICEBERG_REST_FACTORY_IDENTIFIER = ICEBERG_REST.name().toLowerCase(Locale.ROOT); - public static final String HADOOP_FACTORY_IDENTIFIER = HADOOP.name().toLowerCase(Locale.ROOT); + public static final String ICEBERG_REST_FACTORY_IDENTIFIER = "iceberg_rest"; + public static final String HADOOP_FACTORY_IDENTIFIER = "hadoop"; private static final ConnectionType[] REVERSE_MAPPING_ARRAY;