diff --git a/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestRESTCatalogHA.java b/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestRESTCatalogHA.java new file mode 100644 index 000000000000..5c90a4e75dae --- /dev/null +++ b/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestRESTCatalogHA.java @@ -0,0 +1,698 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.cli; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; +import org.apache.hive.testutils.MiniZooKeeperCluster; +import org.apache.iceberg.rest.discovery.RESTCatalogEndpointDiscovery; +import org.apache.iceberg.rest.ha.RESTCatalogHARegistry; +import org.apache.iceberg.rest.ha.RESTCatalogHARegistryHelper; +import org.apache.iceberg.rest.ha.RESTCatalogInstance; +import org.apache.iceberg.rest.standalone.StandaloneRESTCatalogServer; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for REST Catalog High Availability. + * Tests ZooKeeper-based service discovery and leader election. + */ +public class TestRESTCatalogHA { + private static final Logger LOG = LoggerFactory.getLogger(TestRESTCatalogHA.class); + + private static MiniZooKeeperCluster zkCluster; + private static int zkPort; + private static File zkTempDir; + private static int hmsPort = -1; // Embedded HMS port + + private Configuration conf; + private StandaloneRESTCatalogServer server1; + private StandaloneRESTCatalogServer server2; + private StandaloneRESTCatalogServer server3; + private String testNamespace; // Unique namespace per test to avoid stale instances + + @BeforeClass + public static void setupZooKeeper() throws Exception { + // Create temporary directory for ZooKeeper data + zkTempDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "zk_test_").toFile(); + + // Start embedded ZooKeeper cluster + zkCluster = new MiniZooKeeperCluster(); + zkPort = zkCluster.startup(zkTempDir); + + LOG.info("Started embedded ZooKeeper on port: {}", zkPort); + + // Start embedded HMS for REST Catalog operations + Configuration hmsConf = MetastoreConf.newMetastoreConf(); + MetaStoreTestUtils.setConfForStandloneMode(hmsConf); + + // Set up Derby database + File hmsTempDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "hms_test_").toFile(); + String jdbcUrl = "jdbc:derby:memory:" + hmsTempDir.getAbsolutePath() + File.separator + "metastore_db;create=true"; + MetastoreConf.setVar(hmsConf, ConfVars.CONNECT_URL_KEY, jdbcUrl); + + // Set warehouse directories + File warehouseDir = new File(hmsTempDir, "warehouse"); + warehouseDir.mkdirs(); + MetastoreConf.setVar(hmsConf, ConfVars.WAREHOUSE, warehouseDir.getAbsolutePath()); + + File warehouseExternalDir = new File(hmsTempDir, "warehouse_external"); + warehouseExternalDir.mkdirs(); + MetastoreConf.setVar(hmsConf, ConfVars.WAREHOUSE_EXTERNAL, warehouseExternalDir.getAbsolutePath()); + + // Start HMS + hmsPort = MetaStoreTestUtils.startMetaStoreWithRetry( + HadoopThriftAuthBridge.getBridge(), hmsConf, true, false, false, false); + LOG.info("Started embedded HMS on port: {}", hmsPort); + } + + @AfterClass + public static void teardownZooKeeper() throws Exception { + if (zkCluster != null) { + zkCluster.shutdown(); + zkCluster = null; + } + if (zkTempDir != null && zkTempDir.exists()) { + deleteDirectory(zkTempDir); + } + // Stop embedded HMS + if (hmsPort > 0) { + try { + MetaStoreTestUtils.close(hmsPort); + LOG.info("Stopped embedded HMS on port: {}", hmsPort); + } catch (Exception e) { + LOG.warn("Error stopping HMS", e); + } + hmsPort = -1; + } + } + + private static void deleteDirectory(File directory) { + if (directory.exists()) { + File[] files = directory.listFiles(); + if (files != null) { + for (File file : files) { + if (file.isDirectory()) { + deleteDirectory(file); + } else { + file.delete(); + } + } + } + directory.delete(); + } + } + + @Before + public void setup() throws Exception { + conf = new Configuration(); + + // Configure embedded ZooKeeper + // IMPORTANT: Set ZooKeeper quorum BEFORE THRIFT_URIS to avoid ZkRegistryBase + // using THRIFT_URIS (which contains HMS URI) as ZooKeeper connection string + String zkQuorum = "localhost:" + zkPort; + conf.set("hive.zookeeper.quorum", zkQuorum); + conf.set("hive.zookeeper.client.port", String.valueOf(zkPort)); + MetastoreConf.setVar(conf, ConfVars.THRIFT_ZOOKEEPER_CLIENT_PORT, String.valueOf(zkPort)); + + // Enable REST Catalog HA + // Use unique namespace per test to avoid stale instances from previous tests + // This is similar to how Hive tests use unique instance IDs (UUID) + testNamespace = "restCatalogHATest-" + UUID.randomUUID().toString(); + MetastoreConf.setBoolVar(conf, ConfVars.REST_CATALOG_HA_ENABLED, true); + MetastoreConf.setVar(conf, ConfVars.REST_CATALOG_HA_REGISTRY_NAMESPACE, testNamespace); + MetastoreConf.setVar(conf, ConfVars.REST_CATALOG_HA_MODE, "active-passive"); + + // Configure HMS connection (for REST Catalog to connect to HMS) + // Use the embedded HMS started in @BeforeClass + // IMPORTANT: Set this AFTER ZooKeeper config, and use a format that won't be confused + // with ZooKeeper connection string. ZkRegistryBase checks THRIFT_URIS first, but + // since we've set hive.zookeeper.quorum, it should use that instead. + String hmsUri = "thrift://localhost:" + hmsPort; + MetastoreConf.setVar(conf, ConfVars.THRIFT_URIS, hmsUri); + + // Configure warehouse + MetastoreConf.setVar(conf, ConfVars.WAREHOUSE, "/tmp/warehouse"); + MetastoreConf.setVar(conf, ConfVars.WAREHOUSE_EXTERNAL, "/tmp/warehouse_external"); + + // Configure REST Catalog servlet path (required for servlet creation) + // Note: Path should NOT have leading slash - ServletServerBuilder adds it + MetastoreConf.setVar(conf, ConfVars.ICEBERG_CATALOG_SERVLET_PATH, "iceberg"); + + // Configure authentication to "none" for tests (avoids JWT/OAuth2 configuration) + MetastoreConf.setVar(conf, ConfVars.CATALOG_SERVLET_AUTH, "none"); + + LOG.info("Test setup complete. ZK: {}, HMS: {}", zkQuorum, hmsUri); + } + + @After + public void teardown() throws Exception { + if (server1 != null) { + try { + server1.stop(); + } catch (Exception e) { + LOG.warn("Error stopping server1", e); + } + server1 = null; + } + if (server2 != null) { + try { + server2.stop(); + } catch (Exception e) { + LOG.warn("Error stopping server2", e); + } + server2 = null; + } + if (server3 != null) { + try { + server3.stop(); + } catch (Exception e) { + LOG.warn("Error stopping server3", e); + } + server3 = null; + } + } + + @Test(timeout = 60000) // 60 second timeout + public void testServiceDiscovery() throws Exception { + LOG.info("=== Test: Service Discovery ==="); + + // Start multiple REST Catalog instances + Configuration conf1 = new Configuration(conf); + MetastoreConf.setLongVar(conf1, ConfVars.CATALOG_SERVLET_PORT, 8081); + MetastoreConf.setVar(conf1, ConfVars.REST_CATALOG_INSTANCE_URI, "localhost:8081"); + LOG.info("Creating server1..."); + server1 = new StandaloneRESTCatalogServer(conf1); + LOG.info("Starting server1 (this may take a moment)..."); + server1.start(); + LOG.info("Server1 started successfully"); + + Configuration conf2 = new Configuration(conf); + MetastoreConf.setLongVar(conf2, ConfVars.CATALOG_SERVLET_PORT, 8082); + MetastoreConf.setVar(conf2, ConfVars.REST_CATALOG_INSTANCE_URI, "localhost:8082"); + LOG.info("Creating server2..."); + server2 = new StandaloneRESTCatalogServer(conf2); + LOG.info("Starting server2 (this may take a moment)..."); + server2.start(); + LOG.info("Server2 started successfully"); + + // Wait for registration in ZooKeeper + LOG.info("Waiting for instances to register in ZooKeeper..."); + Thread.sleep(3000); + + // Client discovers instances + LOG.info("Creating discovery client..."); + RESTCatalogEndpointDiscovery discovery = new RESTCatalogEndpointDiscovery(conf); + LOG.info("Fetching instances from ZooKeeper..."); + Collection instances = discovery.getAllInstances(); + + assertNotNull("Instances should be discovered", instances); + assertTrue("Should discover at least 2 instances", instances.size() >= 2); + + LOG.info("Discovered {} REST Catalog instances", instances.size()); + for (RESTCatalogInstance instance : instances) { + LOG.info("Instance: {}", instance); + } + + discovery.close(); + } + + @Test(timeout = 60000) // 60 second timeout + public void testLeaderElection() throws Exception { + LOG.info("=== Test: Leader Election ==="); + + // Start multiple instances + Configuration conf1 = new Configuration(conf); + MetastoreConf.setLongVar(conf1, ConfVars.CATALOG_SERVLET_PORT, 8083); + MetastoreConf.setVar(conf1, ConfVars.REST_CATALOG_INSTANCE_URI, "localhost:8083"); + server1 = new StandaloneRESTCatalogServer(conf1); + server1.start(); + + Configuration conf2 = new Configuration(conf); + MetastoreConf.setLongVar(conf2, ConfVars.CATALOG_SERVLET_PORT, 8084); + MetastoreConf.setVar(conf2, ConfVars.REST_CATALOG_INSTANCE_URI, "localhost:8084"); + server2 = new StandaloneRESTCatalogServer(conf2); + server2.start(); + + // Wait for leader election + Thread.sleep(3000); + + // Check leader + RESTCatalogHARegistry registry = RESTCatalogHARegistryHelper.getRegistry(conf); + RESTCatalogInstance leader = registry.getLeader(); + + assertNotNull("Leader should be elected", leader); + assertTrue("Leader should be marked as leader", leader.isLeader()); + + LOG.info("Leader instance: {}", leader); + + // Verify only one leader + Collection instances = registry.getAll(); + int leaderCount = 0; + for (RESTCatalogInstance instance : instances) { + if (instance.isLeader()) { + leaderCount++; + } + } + assertEquals("Should have exactly one leader", 1, leaderCount); + + registry.stop(); + } + + @Test(timeout = 60000) // 60 second timeout + public void testClientConnectsToLeader() throws Exception { + LOG.info("=== Test: Client Connects to Leader ==="); + + // Start instances + Configuration conf1 = new Configuration(conf); + MetastoreConf.setLongVar(conf1, ConfVars.CATALOG_SERVLET_PORT, 8085); + MetastoreConf.setVar(conf1, ConfVars.REST_CATALOG_INSTANCE_URI, "localhost:8085"); + server1 = new StandaloneRESTCatalogServer(conf1); + server1.start(); + + Configuration conf2 = new Configuration(conf); + MetastoreConf.setLongVar(conf2, ConfVars.CATALOG_SERVLET_PORT, 8086); + MetastoreConf.setVar(conf2, ConfVars.REST_CATALOG_INSTANCE_URI, "localhost:8086"); + server2 = new StandaloneRESTCatalogServer(conf2); + server2.start(); + + // Wait for leader election + Thread.sleep(3000); + + // Client should connect to leader + RESTCatalogEndpointDiscovery discovery = new RESTCatalogEndpointDiscovery(conf); + String endpoint = discovery.getEndpoint(); + + assertNotNull("Endpoint should be discovered", endpoint); + assertTrue("Endpoint should contain /iceberg", endpoint.contains("/iceberg")); + + LOG.info("Client connecting to: {}", endpoint); + + // Verify it's the leader's endpoint + RESTCatalogInstance leader = discovery.getLeader(); + assertNotNull("Leader should exist", leader); + assertEquals("Endpoint should match leader", leader.getRestEndpoint(), endpoint); + + discovery.close(); + } + + @Test(timeout = 60000) // 60 second timeout + public void testRESTCatalogOperations() throws Exception { + LOG.info("=== Test: REST Catalog Operations in HA Mode ==="); + + // Start instances + Configuration conf1 = new Configuration(conf); + MetastoreConf.setLongVar(conf1, ConfVars.CATALOG_SERVLET_PORT, 8091); + MetastoreConf.setVar(conf1, ConfVars.REST_CATALOG_INSTANCE_URI, "localhost:8091"); + server1 = new StandaloneRESTCatalogServer(conf1); + server1.start(); + + Configuration conf2 = new Configuration(conf); + MetastoreConf.setLongVar(conf2, ConfVars.CATALOG_SERVLET_PORT, 8092); + MetastoreConf.setVar(conf2, ConfVars.REST_CATALOG_INSTANCE_URI, "localhost:8092"); + server2 = new StandaloneRESTCatalogServer(conf2); + server2.start(); + + // Wait for leader election + Thread.sleep(3000); + + // Discover leader endpoint + RESTCatalogEndpointDiscovery discovery = new RESTCatalogEndpointDiscovery(conf); + String endpoint = discovery.getEndpoint(); + assertNotNull("Endpoint should be discovered", endpoint); + LOG.info("Discovered REST Catalog endpoint: {}", endpoint); + + // Verify we're connected to the leader + RESTCatalogInstance leader = discovery.getLeader(); + assertNotNull("Leader should exist", leader); + assertTrue("Leader should be marked as leader", leader.isLeader()); + assertEquals("Endpoint should match leader", leader.getRestEndpoint(), endpoint); + + // Make actual HTTP call to REST Catalog server to verify it's working + // Call the config endpoint (GET /v1/config) which should return catalog configuration + URL configUrl = new URL(endpoint.replace("/iceberg", "/iceberg/v1/config")); + HttpURLConnection connection = (HttpURLConnection) configUrl.openConnection(); + connection.setRequestMethod("GET"); + connection.setConnectTimeout(5000); + connection.setReadTimeout(5000); + + try { + int responseCode = connection.getResponseCode(); + LOG.info("REST Catalog config endpoint returned status: {}", responseCode); + + // The endpoint should respond (even if it's 401/403 without auth, that's fine - means server is up) + // 200 means success, 401/403 means auth required (server is working) + assertTrue("REST Catalog server should respond", + responseCode == 200 || responseCode == 401 || responseCode == 403 || responseCode == 404); + + LOG.info("Successfully verified REST Catalog server is responding at: {}", endpoint); + } finally { + connection.disconnect(); + discovery.close(); + } + } + + @Test(timeout = 60000) // 60 second timeout + public void testCreateDatabaseViaHTTP() throws Exception { + LOG.info("=== Test: Create Database via HTTP in HA Mode ==="); + + // Start instances + Configuration conf1 = new Configuration(conf); + MetastoreConf.setLongVar(conf1, ConfVars.CATALOG_SERVLET_PORT, 8093); + MetastoreConf.setVar(conf1, ConfVars.REST_CATALOG_INSTANCE_URI, "localhost:8093"); + server1 = new StandaloneRESTCatalogServer(conf1); + server1.start(); + + Configuration conf2 = new Configuration(conf); + MetastoreConf.setLongVar(conf2, ConfVars.CATALOG_SERVLET_PORT, 8094); + MetastoreConf.setVar(conf2, ConfVars.REST_CATALOG_INSTANCE_URI, "localhost:8094"); + server2 = new StandaloneRESTCatalogServer(conf2); + server2.start(); + + // Wait for leader election + Thread.sleep(3000); + + // Discover leader endpoint + RESTCatalogEndpointDiscovery discovery = new RESTCatalogEndpointDiscovery(conf); + String endpoint = discovery.getEndpoint(); + assertNotNull("Endpoint should be discovered", endpoint); + LOG.info("Discovered REST Catalog endpoint: {}", endpoint); + + // Verify we're connected to the leader + RESTCatalogInstance leader = discovery.getLeader(); + assertNotNull("Leader should exist", leader); + assertTrue("Leader should be marked as leader", leader.isLeader()); + + // Create a database via HTTP POST to /v1/namespaces + String testDbName = "test_db_" + System.currentTimeMillis(); + String createNamespaceUrl = endpoint + "/v1/namespaces"; + + // CreateNamespaceRequest JSON format: {"namespace": ["database_name"], "properties": {}} + String requestBody = String.format("{\"namespace\":[\"%s\"],\"properties\":{}}", testDbName); + + URL url = new URL(createNamespaceUrl); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("POST"); + conn.setRequestProperty("Content-Type", "application/json"); + conn.setDoOutput(true); + conn.setConnectTimeout(5000); + conn.setReadTimeout(5000); + + try { + // Send request + try (OutputStream os = conn.getOutputStream()) { + byte[] input = requestBody.getBytes("utf-8"); + os.write(input, 0, input.length); + } + + int responseCode = conn.getResponseCode(); + LOG.info("Create namespace response code: {}", responseCode); + + // Read response + String response; + if (responseCode >= 200 && responseCode < 300) { + try (BufferedReader br = new BufferedReader( + new InputStreamReader(conn.getInputStream(), "utf-8"))) { + StringBuilder responseBuilder = new StringBuilder(); + String responseLine; + while ((responseLine = br.readLine()) != null) { + responseBuilder.append(responseLine.trim()); + } + response = responseBuilder.toString(); + } + LOG.info("Create namespace response: {}", response); + assertTrue("Namespace should be created successfully", responseCode == 200 || responseCode == 201); + assertTrue("Response should contain namespace name", response.contains(testDbName)); + } else { + // Read error response + try (BufferedReader br = new BufferedReader( + new InputStreamReader(conn.getErrorStream(), "utf-8"))) { + StringBuilder responseBuilder = new StringBuilder(); + String responseLine; + while ((responseLine = br.readLine()) != null) { + responseBuilder.append(responseLine.trim()); + } + response = responseBuilder.toString(); + } + LOG.error("Failed to create namespace. Response code: {}, Error: {}", responseCode, response); + // Don't fail if it's auth-related (401/403) - that's expected without proper auth headers + if (responseCode != 401 && responseCode != 403) { + fail("Failed to create namespace: " + response); + } + } + + // Verify namespace exists by calling GET /v1/namespaces/{namespace} + String getNamespaceUrl = endpoint + "/v1/namespaces/" + testDbName; + URL getUrl = new URL(getNamespaceUrl); + HttpURLConnection getConn = (HttpURLConnection) getUrl.openConnection(); + getConn.setRequestMethod("GET"); + getConn.setConnectTimeout(5000); + getConn.setReadTimeout(5000); + + try { + int getResponseCode = getConn.getResponseCode(); + LOG.info("Get namespace response code: {}", getResponseCode); + + if (getResponseCode == 200) { + try (BufferedReader br = new BufferedReader( + new InputStreamReader(getConn.getInputStream(), "utf-8"))) { + StringBuilder responseBuilder = new StringBuilder(); + String responseLine; + while ((responseLine = br.readLine()) != null) { + responseBuilder.append(responseLine.trim()); + } + String getResponse = responseBuilder.toString(); + LOG.info("Get namespace response: {}", getResponse); + assertTrue("Response should contain namespace name", getResponse.contains(testDbName)); + } + } + } finally { + getConn.disconnect(); + } + + LOG.info("Successfully created and verified database '{}' via REST Catalog HA leader", testDbName); + } finally { + conn.disconnect(); + discovery.close(); + } + } + + @Test(timeout = 60000) // 60 second timeout + public void testFailover() throws Exception { + LOG.info("=== Test: Failover ==="); + + // Start instances + Configuration conf1 = new Configuration(conf); + MetastoreConf.setLongVar(conf1, ConfVars.CATALOG_SERVLET_PORT, 8087); + MetastoreConf.setVar(conf1, ConfVars.REST_CATALOG_INSTANCE_URI, "localhost:8087"); + server1 = new StandaloneRESTCatalogServer(conf1); + server1.start(); + + Configuration conf2 = new Configuration(conf); + MetastoreConf.setLongVar(conf2, ConfVars.CATALOG_SERVLET_PORT, 8088); + MetastoreConf.setVar(conf2, ConfVars.REST_CATALOG_INSTANCE_URI, "localhost:8088"); + server2 = new StandaloneRESTCatalogServer(conf2); + server2.start(); + + // Wait for leader election - use server's isLeader() method + int retries = 20; + while (retries > 0 && !server1.isLeader() && !server2.isLeader()) { + Thread.sleep(500); + retries--; + } + assertTrue("One of the servers should be leader", server1.isLeader() || server2.isLeader()); + + // Determine which server is the initial leader + StandaloneRESTCatalogServer initialLeaderServer = server1.isLeader() ? server1 : server2; + StandaloneRESTCatalogServer standbyServer = server1.isLeader() ? server2 : server1; + + LOG.info("Initial leader is server1: {}", server1.isLeader()); + + // Get registry and verify initial state (following Hive test pattern) + RESTCatalogHARegistry registry = RESTCatalogHARegistryHelper.getRegistry(conf); + + // Helper to filter instances by our test ports (8087, 8088) + java.util.function.Predicate isTestInstance = instance -> { + String endpoint = instance.getRestEndpoint(); + return endpoint != null && (endpoint.contains(":8087") || endpoint.contains(":8088")); + }; + + // Wait for both test instances to be registered + retries = 30; + while (retries > 0) { + long testInstanceCount = registry.getAll().stream().filter(isTestInstance).count(); + if (testInstanceCount >= 2) { + break; + } + Thread.sleep(100); + retries--; + } + + // Get fresh registry instance (following Hive test pattern) + registry = RESTCatalogHARegistryHelper.getRegistry(conf); + Collection allInstances = registry.getAll(); + List instances = allInstances.stream() + .filter(isTestInstance) + .collect(Collectors.toList()); + assertEquals("Should have 2 test instances", 2, instances.size()); + + // Separate into leaders and standby (following Hive test pattern) + List leaders = new ArrayList<>(); + List standby = new ArrayList<>(); + for (RESTCatalogInstance instance : instances) { + if (instance.isLeader()) { + leaders.add(instance); + } else { + standby.add(instance); + } + } + assertEquals("Should have exactly 1 leader", 1, leaders.size()); + assertEquals("Should have exactly 1 standby", 1, standby.size()); + + RESTCatalogInstance initialLeaderInstance = leaders.get(0); + LOG.info("Initial leader instance: {}", initialLeaderInstance); + + // Stop the leader server + initialLeaderServer.stop(); + if (initialLeaderServer == server1) { + server1 = null; + } else { + server2 = null; + } + + // Wait for standby to become leader + retries = 20; + while (retries > 0 && !standbyServer.isLeader()) { + Thread.sleep(500); + retries--; + } + assertTrue("Standby server should become leader after failover", standbyServer.isLeader()); + + // Wait for registry to update - filter by test ports to avoid stale instances + retries = 50; // 5 seconds max + while (retries > 0) { + long testInstanceCount = registry.getAll().stream().filter(isTestInstance).count(); + if (testInstanceCount == 1) { + break; + } + Thread.sleep(100); + retries--; + } + assertTrue("Should have 1 test instance after leader stops", + registry.getAll().stream().filter(isTestInstance).count() == 1); + + // Get fresh registry instance (following Hive test pattern) + registry = RESTCatalogHARegistryHelper.getRegistry(conf); + allInstances = registry.getAll(); + instances = allInstances.stream() + .filter(isTestInstance) + .collect(Collectors.toList()); + assertEquals("Should have 1 test instance after leader stops", 1, instances.size()); + + // Verify new leader (following Hive test pattern) + leaders = new ArrayList<>(); + standby = new ArrayList<>(); + for (RESTCatalogInstance instance : instances) { + if (instance.isLeader()) { + leaders.add(instance); + } else { + standby.add(instance); + } + } + assertEquals("Should have exactly 1 leader", 1, leaders.size()); + assertEquals("Should have 0 standby", 0, standby.size()); + + RESTCatalogInstance newLeaderInstance = leaders.get(0); + assertTrue("New leader should be different from old leader", + !newLeaderInstance.getWorkerIdentity().equals(initialLeaderInstance.getWorkerIdentity())); + LOG.info("New leader instance after failover: {}", newLeaderInstance); + + registry.stop(); + } + + @Test(timeout = 60000) // 60 second timeout + public void testActiveActiveMode() throws Exception { + LOG.info("=== Test: Active-Active Mode ==="); + + // Configure for active-active + Configuration confActiveActive = new Configuration(conf); + MetastoreConf.setVar(confActiveActive, ConfVars.REST_CATALOG_HA_MODE, "active-active"); + + // Start instances + Configuration conf1 = new Configuration(confActiveActive); + MetastoreConf.setLongVar(conf1, ConfVars.CATALOG_SERVLET_PORT, 8089); + MetastoreConf.setVar(conf1, ConfVars.REST_CATALOG_INSTANCE_URI, "localhost:8089"); + server1 = new StandaloneRESTCatalogServer(conf1); + server1.start(); + + Configuration conf2 = new Configuration(confActiveActive); + MetastoreConf.setLongVar(conf2, ConfVars.CATALOG_SERVLET_PORT, 8090); + MetastoreConf.setVar(conf2, ConfVars.REST_CATALOG_INSTANCE_URI, "localhost:8090"); + server2 = new StandaloneRESTCatalogServer(conf2); + server2.start(); + + // Wait for registration + Thread.sleep(2000); + + // Client should get random instance + RESTCatalogEndpointDiscovery discovery = new RESTCatalogEndpointDiscovery(confActiveActive); + String endpoint1 = discovery.getEndpoint(); + String endpoint2 = discovery.getEndpoint(); + + assertNotNull("Endpoint should be discovered", endpoint1); + assertNotNull("Endpoint should be discovered", endpoint2); + + // In active-active mode, we might get different instances + LOG.info("Endpoint 1: {}", endpoint1); + LOG.info("Endpoint 2: {}", endpoint2); + + Collection instances = discovery.getAllInstances(); + assertTrue("Should have multiple instances", instances.size() >= 2); + + discovery.close(); + } +} + diff --git a/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestRESTCatalogModeSelection.java b/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestRESTCatalogModeSelection.java new file mode 100644 index 000000000000..17f754f6fb2d --- /dev/null +++ b/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestRESTCatalogModeSelection.java @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.cli; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; +import org.apache.hive.testutils.MiniZooKeeperCluster; +import org.apache.iceberg.rest.HMSCatalogFactory; +import org.apache.iceberg.rest.standalone.StandaloneRESTCatalogServer; +import org.eclipse.jetty.server.Server; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration tests to validate that the correct HMS REST Catalog server + * implementation is used based on configuration. + * + * Tests three modes: + * 1. Embedded mode: REST Catalog servlet runs within HMS process + * 2. Standalone HA mode: Standalone server with High Availability enabled + * 3. Standalone non-HA mode: Standalone server without HA + * 4. Disabled mode: REST Catalog is not started + */ +public class TestRESTCatalogModeSelection { + private static final Logger LOG = LoggerFactory.getLogger(TestRESTCatalogModeSelection.class); + + private static MiniZooKeeperCluster zkCluster; + private static int zkPort; + private static File zkTempDir; + private static int hmsPort = -1; // Embedded HMS port + + private Configuration conf; + private StandaloneRESTCatalogServer standaloneServer; + private Server embeddedServletServer; + + @BeforeClass + public static void setupZooKeeper() throws Exception { + // Create temporary directory for ZooKeeper data + zkTempDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "zk_mode_test_").toFile(); + + // Start embedded ZooKeeper cluster + zkCluster = new MiniZooKeeperCluster(); + zkPort = zkCluster.startup(zkTempDir); + + LOG.info("Started embedded ZooKeeper on port: {}", zkPort); + + // Start embedded HMS for testing embedded mode + Configuration hmsConf = MetastoreConf.newMetastoreConf(); + MetaStoreTestUtils.setConfForStandloneMode(hmsConf); + + // Set up Derby database + File hmsTempDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "hms_mode_test_").toFile(); + String jdbcUrl = "jdbc:derby:memory:" + hmsTempDir.getAbsolutePath() + File.separator + "metastore_db;create=true"; + MetastoreConf.setVar(hmsConf, ConfVars.CONNECT_URL_KEY, jdbcUrl); + + // Set warehouse directories + File warehouseDir = new File(hmsTempDir, "warehouse"); + warehouseDir.mkdirs(); + MetastoreConf.setVar(hmsConf, ConfVars.WAREHOUSE, warehouseDir.getAbsolutePath()); + + File warehouseExternalDir = new File(hmsTempDir, "warehouse_external"); + warehouseExternalDir.mkdirs(); + MetastoreConf.setVar(hmsConf, ConfVars.WAREHOUSE_EXTERNAL, warehouseExternalDir.getAbsolutePath()); + + // Start HMS + hmsPort = MetaStoreTestUtils.startMetaStoreWithRetry( + HadoopThriftAuthBridge.getBridge(), hmsConf, true, false, false, false); + LOG.info("Started embedded HMS on port: {}", hmsPort); + } + + @AfterClass + public static void teardownZooKeeper() throws Exception { + if (zkCluster != null) { + zkCluster.shutdown(); + zkCluster = null; + } + if (zkTempDir != null && zkTempDir.exists()) { + deleteDirectory(zkTempDir); + } + // Stop embedded HMS + if (hmsPort > 0) { + try { + MetaStoreTestUtils.close(hmsPort); + LOG.info("Stopped embedded HMS on port: {}", hmsPort); + } catch (Exception e) { + LOG.warn("Error stopping HMS", e); + } + hmsPort = -1; + } + } + + private static void deleteDirectory(File directory) { + if (directory.exists()) { + File[] files = directory.listFiles(); + if (files != null) { + for (File file : files) { + if (file.isDirectory()) { + deleteDirectory(file); + } else { + file.delete(); + } + } + } + directory.delete(); + } + } + + @Before + public void setup() throws Exception { + conf = new Configuration(); + + // Configure embedded ZooKeeper + String zkQuorum = "localhost:" + zkPort; + conf.set("hive.zookeeper.quorum", zkQuorum); + conf.set("hive.zookeeper.client.port", String.valueOf(zkPort)); + MetastoreConf.setVar(conf, ConfVars.THRIFT_ZOOKEEPER_CLIENT_PORT, String.valueOf(zkPort)); + + // Configure HMS connection + String hmsUri = "thrift://localhost:" + hmsPort; + MetastoreConf.setVar(conf, ConfVars.THRIFT_URIS, hmsUri); + + // Configure warehouse + MetastoreConf.setVar(conf, ConfVars.WAREHOUSE, "/tmp/warehouse"); + MetastoreConf.setVar(conf, ConfVars.WAREHOUSE_EXTERNAL, "/tmp/warehouse_external"); + + // Configure REST Catalog servlet path (required for servlet creation) + MetastoreConf.setVar(conf, ConfVars.ICEBERG_CATALOG_SERVLET_PATH, "iceberg"); + + // Configure authentication to "none" for tests + MetastoreConf.setVar(conf, ConfVars.CATALOG_SERVLET_AUTH, "none"); + + LOG.info("Test setup complete. ZK: {}, HMS: {}", zkQuorum, hmsUri); + } + + @After + public void teardown() throws Exception { + if (standaloneServer != null) { + try { + standaloneServer.stop(); + } catch (Exception e) { + LOG.warn("Error stopping standalone server", e); + } + standaloneServer = null; + } + if (embeddedServletServer != null) { + try { + embeddedServletServer.stop(); + } catch (Exception e) { + LOG.warn("Error stopping embedded servlet server", e); + } + embeddedServletServer = null; + } + } + + /** + * Test that embedded mode is used when: + * - CATALOG_SERVLET_PORT >= 0 + * - REST_CATALOG_HA_ENABLED = false (or not set) + * + * In embedded mode, HMS starts the REST Catalog servlet internally. + */ + @Test(timeout = 60000) + public void testEmbeddedMode() throws Exception { + LOG.info("=== Test: Embedded Mode ==="); + + // Configure for embedded mode: port >= 0, HA disabled + MetastoreConf.setLongVar(conf, ConfVars.CATALOG_SERVLET_PORT, 8095); + MetastoreConf.setBoolVar(conf, ConfVars.REST_CATALOG_HA_ENABLED, false); + + // Simulate what HMS does: call HMSCatalogFactory.createServlet() + // This should return a non-null descriptor, indicating embedded mode is enabled + org.apache.hadoop.hive.metastore.ServletServerBuilder.Descriptor descriptor = + HMSCatalogFactory.createServlet(conf); + + assertNotNull("REST Catalog servlet descriptor should be created for embedded mode", descriptor); + assertTrue("Port should be >= 0", descriptor.getPort() >= 0); + assertEquals("Path should match", "iceberg", descriptor.getPath()); + assertNotNull("Servlet should not be null", descriptor.getServlet()); + + // Verify that creating a standalone server would fail or not be used + // (In real deployment, standalone server wouldn't be started if embedded is enabled) + + LOG.info("Embedded mode test passed: servlet descriptor created successfully"); + } + + /** + * Test that standalone HA mode is used when: + * - CATALOG_SERVLET_PORT >= 0 + * - REST_CATALOG_HA_ENABLED = true + * + * In standalone HA mode, StandaloneRESTCatalogServer starts with HA registry. + */ + @Test(timeout = 60000) + public void testStandaloneHAMode() throws Exception { + LOG.info("=== Test: Standalone HA Mode ==="); + + // Configure for standalone HA mode + MetastoreConf.setLongVar(conf, ConfVars.CATALOG_SERVLET_PORT, 8096); + MetastoreConf.setBoolVar(conf, ConfVars.REST_CATALOG_HA_ENABLED, true); + + // Use unique namespace per test + String testNamespace = "restCatalogModeTest-" + UUID.randomUUID().toString(); + MetastoreConf.setVar(conf, ConfVars.REST_CATALOG_HA_REGISTRY_NAMESPACE, testNamespace); + MetastoreConf.setVar(conf, ConfVars.REST_CATALOG_HA_MODE, "active-passive"); + MetastoreConf.setVar(conf, ConfVars.REST_CATALOG_INSTANCE_URI, "localhost:8096"); + + // Create and start standalone server + standaloneServer = new StandaloneRESTCatalogServer(conf); + standaloneServer.start(); + + // Wait for server to start and register + Thread.sleep(2000); + + // Verify server is running + Server server = standaloneServer.getServer(); + assertNotNull("Standalone server should be started", server); + assertTrue("Server should be started", server.isStarted()); + + // Verify HA registry is initialized (server should have HA registry) + // We can check by verifying the server responds to HTTP requests + String endpoint = "http://localhost:8096/iceberg"; + URL url = new URL(endpoint + "/v1/config"); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("GET"); + conn.setConnectTimeout(5000); + conn.setReadTimeout(5000); + + int responseCode = conn.getResponseCode(); + // Should get 200, 401, 403, or 404 (any response means server is running) + assertTrue("Server should respond (response code: " + responseCode + ")", + responseCode == 200 || responseCode == 401 || responseCode == 403 || responseCode == 404); + + LOG.info("Standalone HA mode test passed: server started and responding"); + } + + /** + * Test that standalone non-HA mode is used when: + * - CATALOG_SERVLET_PORT >= 0 + * - REST_CATALOG_HA_ENABLED = false + * - StandaloneRESTCatalogServer is started (not embedded in HMS) + * + * In standalone non-HA mode, StandaloneRESTCatalogServer starts without HA registry. + */ + @Test(timeout = 60000) + public void testStandaloneNonHAMode() throws Exception { + LOG.info("=== Test: Standalone Non-HA Mode ==="); + + // Configure for standalone non-HA mode + MetastoreConf.setLongVar(conf, ConfVars.CATALOG_SERVLET_PORT, 8097); + MetastoreConf.setBoolVar(conf, ConfVars.REST_CATALOG_HA_ENABLED, false); + MetastoreConf.setVar(conf, ConfVars.REST_CATALOG_INSTANCE_URI, "localhost:8097"); + + // Create and start standalone server + standaloneServer = new StandaloneRESTCatalogServer(conf); + standaloneServer.start(); + + // Wait for server to start + Thread.sleep(2000); + + // Verify server is running + Server server = standaloneServer.getServer(); + assertNotNull("Standalone server should be started", server); + assertTrue("Server should be started", server.isStarted()); + + // Verify server responds to HTTP requests + String endpoint = "http://localhost:8097/iceberg"; + URL url = new URL(endpoint + "/v1/config"); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("GET"); + conn.setConnectTimeout(5000); + conn.setReadTimeout(5000); + + int responseCode = conn.getResponseCode(); + assertTrue("Server should respond (response code: " + responseCode + ")", + responseCode == 200 || responseCode == 401 || responseCode == 403 || responseCode == 404); + + LOG.info("Standalone non-HA mode test passed: server started and responding"); + } + + /** + * Test that REST Catalog is disabled when: + * - CATALOG_SERVLET_PORT < 0 + * + * In disabled mode, HMSCatalogFactory.createServlet() returns null. + */ + @Test(timeout = 60000) + public void testDisabledMode() throws Exception { + LOG.info("=== Test: Disabled Mode ==="); + + // Configure for disabled mode: port < 0 + MetastoreConf.setLongVar(conf, ConfVars.CATALOG_SERVLET_PORT, -1); + + // Call HMSCatalogFactory.createServlet() - should return null + org.apache.hadoop.hive.metastore.ServletServerBuilder.Descriptor descriptor = + HMSCatalogFactory.createServlet(conf); + + assertNull("REST Catalog servlet descriptor should be null when disabled", descriptor); + + LOG.info("Disabled mode test passed: servlet descriptor is null"); + } + + /** + * Test that REST Catalog is disabled when: + * - ICEBERG_CATALOG_SERVLET_PATH is null or empty + * + * In this case, HMSCatalogFactory.createServlet() returns null. + */ + @Test(timeout = 60000) + public void testDisabledModeNoPath() throws Exception { + LOG.info("=== Test: Disabled Mode (No Path) ==="); + + // Configure port but no path + MetastoreConf.setLongVar(conf, ConfVars.CATALOG_SERVLET_PORT, 8098); + MetastoreConf.setVar(conf, ConfVars.ICEBERG_CATALOG_SERVLET_PATH, ""); // Empty path + + // Call HMSCatalogFactory.createServlet() - should return null + org.apache.hadoop.hive.metastore.ServletServerBuilder.Descriptor descriptor = + HMSCatalogFactory.createServlet(conf); + + assertNull("REST Catalog servlet descriptor should be null when path is empty", descriptor); + + LOG.info("Disabled mode (no path) test passed: servlet descriptor is null"); + } + + /** + * Test that embedded mode servlet can be started via ServletServerBuilder + * (simulating what HMS does internally). + */ + @Test(timeout = 60000) + public void testEmbeddedModeServletStart() throws Exception { + LOG.info("=== Test: Embedded Mode Servlet Start ==="); + + // Configure for embedded mode + MetastoreConf.setLongVar(conf, ConfVars.CATALOG_SERVLET_PORT, 8099); + MetastoreConf.setBoolVar(conf, ConfVars.REST_CATALOG_HA_ENABLED, false); + + // Simulate what HMS does: create servlet descriptor and start server + org.apache.hadoop.hive.metastore.ServletServerBuilder.Descriptor descriptor = + HMSCatalogFactory.createServlet(conf); + + assertNotNull("REST Catalog servlet descriptor should be created", descriptor); + + // Start servlet server (simulating HMS's ServletServerBuilder) + org.apache.hadoop.hive.metastore.ServletServerBuilder builder = + new org.apache.hadoop.hive.metastore.ServletServerBuilder(conf); + builder.addServlet(descriptor); + embeddedServletServer = builder.start(LOG); + + assertNotNull("Embedded servlet server should be started", embeddedServletServer); + assertTrue("Server should be started", embeddedServletServer.isStarted()); + + // Verify server responds to HTTP requests + String endpoint = "http://localhost:8099/iceberg"; + URL url = new URL(endpoint + "/v1/config"); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("GET"); + conn.setConnectTimeout(5000); + conn.setReadTimeout(5000); + + int responseCode = conn.getResponseCode(); + assertTrue("Server should respond (response code: " + responseCode + ")", + responseCode == 200 || responseCode == 401 || responseCode == 403 || responseCode == 404); + + LOG.info("Embedded mode servlet start test passed: server started and responding"); + } +} + diff --git a/llap-client/pom.xml b/llap-client/pom.xml index f36ea3bf4231..889cee34b0db 100644 --- a/llap-client/pom.xml +++ b/llap-client/pom.xml @@ -141,6 +141,11 @@ hadoop-yarn-registry true + + org.apache.hive + hive-standalone-metastore-common + ${project.version} + org.mockito diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java index 6e6ce31d6ff8..441316c742e4 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java @@ -14,116 +14,32 @@ package org.apache.hadoop.hive.registry.impl; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.ACLProvider; -import org.apache.curator.framework.imps.CuratorFrameworkState; -import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; -import org.apache.curator.framework.recipes.nodes.PersistentNode; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.curator.utils.CloseableUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.ZooKeeperHiveHelper; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.utils.SecurityUtils; -import org.apache.hadoop.hive.registry.RegistryUtilities; import org.apache.hadoop.hive.registry.ServiceInstance; -import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.registry.client.binding.RegistryUtils; -import org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal; -import org.apache.hadoop.registry.client.types.ServiceRecord; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.InvalidACLException; -import org.apache.zookeeper.KeeperException.NodeExistsException; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Id; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - /** - * This is currently used for implementation inheritance only; it doesn't provide a unified flow - * into which one can just plug a few abstract method implementations, because providing one with - * getInstance method is a huge pain involving lots of generics. Also, different registries may - * have slightly different usage patterns anyway and noone would use a registry without knowing - * what type it is. So, it's mostly a grab bag of methods used by ServiceInstanceSet and other - * parts of each implementation. + * Hive-specific ZkRegistryBase that extends the base implementation from metastore-common. + * This version uses HiveConf for ZooKeeper configuration to maintain compatibility with Hive components. + * + * For HMS components, use the base class directly from metastore-common. */ -public abstract class ZkRegistryBase { +public abstract class ZkRegistryBase + extends org.apache.hadoop.hive.metastore.registry.impl.ZkRegistryBase { private static final Logger LOG = LoggerFactory.getLogger(ZkRegistryBase.class); - private final static String SASL_NAMESPACE = "sasl"; - private final static String UNSECURE_NAMESPACE = "unsecure"; - protected final static String USER_SCOPE_PATH_PREFIX = "user-"; - protected static final String WORKER_PREFIX = "worker-"; - protected static final String WORKER_GROUP = "workers"; - public static final String UNIQUE_IDENTIFIER = "registry.unique.id"; - protected static final UUID UNIQUE_ID = UUID.randomUUID(); - private static final Joiner PATH_JOINER = Joiner.on("/").skipNulls(); - - protected final Configuration conf; - protected final CuratorFramework zooKeeperClient; - // workersPath is the directory path where all the worker znodes are located. - protected final String workersPath; - private final String workerNodePrefix; - - protected final ServiceRecordMarshal encoder; // to marshal/unmarshal znode data - - private final Set> stateChangeListeners; - - protected final boolean doCheckAcls; - // Secure ZK is only set up by the registering service; anyone can read the registrations. - private final String zkPrincipal, zkKeytab, saslLoginContextName; - private String userNameFromPrincipal; // Only set when setting up the secure config for ZK. - private final String disableMessage; - - private final Lock instanceCacheLock = new ReentrantLock(); - // there can be only one instance per path - private final Map pathToInstanceCache; - // there can be multiple instances per node - private final Map> nodeToInstanceCache; - - // The registration znode. - private PersistentNode znode; - private String znodePath; // unique identity for this instance - - final String namespace; - - private PathChildrenCache instancesCache; // Created on demand. - - /** Local hostname. */ - protected static final String hostname = RegistryUtilities.getCanonicalHostName(); /** * @param rootNs A single root namespace override. Not recommended. @@ -140,84 +56,16 @@ public abstract class ZkRegistryBase { public ZkRegistryBase(String instanceName, Configuration conf, String rootNs, String nsPrefix, String userScopePathPrefix, String workerPrefix, String workerGroup, String zkSaslLoginContextName, String zkPrincipal, String zkKeytab, ConfVars aclsConfig) { - this.conf = new Configuration(conf); - this.saslLoginContextName = zkSaslLoginContextName; - this.zkPrincipal = zkPrincipal; - this.zkKeytab = zkKeytab; - if (aclsConfig != null) { - this.doCheckAcls = HiveConf.getBoolVar(conf, aclsConfig); - this.disableMessage = "Set " + aclsConfig.varname + " to false to disable ACL validation"; - } else { - this.doCheckAcls = true; - this.disableMessage = ""; - } - this.conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - this.encoder = new RegistryUtils.ServiceRecordMarshal(); - - // sample path: /llap-sasl/hiveuser/hostname/workers/worker-0000000 - // worker-0000000 is the sequence number which will be retained until session timeout. If a - // worker does not respond due to communication interruptions it will retain the same sequence - // number when it returns back. If session timeout expires, the node will be deleted and new - // addition of the same node (restart) will get next sequence number - final String userPathPrefix = userScopePathPrefix == null ? null : userScopePathPrefix + getZkPathUser(conf); - this.workerNodePrefix = workerPrefix == null ? WORKER_PREFIX : workerPrefix; - this.workersPath = "/" + PATH_JOINER.join(userPathPrefix, instanceName, workerGroup); - this.instancesCache = null; - this.stateChangeListeners = new HashSet<>(); - this.pathToInstanceCache = new ConcurrentHashMap<>(); - this.nodeToInstanceCache = new ConcurrentHashMap<>(); - this.namespace = getRootNamespace(conf, rootNs, nsPrefix); - ACLProvider aclProvider; - // get acl provider for most outer path that is non-null - if (userPathPrefix == null) { - if (instanceName == null) { - if (workerGroup == null) { - aclProvider = getACLProviderForZKPath(namespace); - } else { - aclProvider = getACLProviderForZKPath(workerGroup); - } - } else { - aclProvider = getACLProviderForZKPath(instanceName); - } - } else { - aclProvider = getACLProviderForZKPath(userScopePathPrefix); - } - this.zooKeeperClient = getZookeeperClient(conf, namespace, aclProvider); - this.zooKeeperClient.getConnectionStateListenable().addListener(new ZkConnectionStateListener()); - } - - public static String getRootNamespace(Configuration conf, String userProvidedNamespace, - String defaultNamespacePrefix) { - final boolean isSecure = isZkEnforceSASLClient(conf); - String rootNs = userProvidedNamespace; - if (rootNs == null) { - rootNs = defaultNamespacePrefix + (isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE); - } - return rootNs; - } - - private ACLProvider getACLProviderForZKPath(String zkPath) { - final boolean isSecure = isZkEnforceSASLClient(conf); - return new ACLProvider() { - @Override - public List getDefaultAcl() { - // We always return something from getAclForPath so this should not happen. - LOG.warn("getDefaultAcl was called"); - return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE); - } - - @Override - public List getAclForPath(String path) { - if (!isSecure || path == null || !path.contains(zkPath)) { - // No security or the path is below the user path - full access. - return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE); - } - return createSecureAcls(); - } - }; + super(instanceName, conf, rootNs, nsPrefix, userScopePathPrefix, workerPrefix, workerGroup, + zkSaslLoginContextName, zkPrincipal, zkKeytab, + aclsConfig != null ? MetastoreConf.ConfVars.THRIFT_ZOOKEEPER_USE_KERBEROS : null); } - private CuratorFramework getZookeeperClient(Configuration conf, String namespace, ACLProvider zooKeeperAclProvider) { + /** + * Override to use HiveConf for ZooKeeper client configuration. + */ + @Override + protected CuratorFramework getZookeeperClient(Configuration conf, String namespace, ACLProvider zooKeeperAclProvider) { String keyStorePassword = ""; String trustStorePassword = ""; if (HiveConf.getBoolVar(conf, ConfVars.HIVE_ZOOKEEPER_SSL_ENABLE)) { @@ -251,424 +99,19 @@ private CuratorFramework getZookeeperClient(Configuration conf, String namespace .build().getNewZookeeperClient(zooKeeperAclProvider, namespace); } - private static List createSecureAcls() { - // Read all to the world - List nodeAcls = new ArrayList<>(ZooDefs.Ids.READ_ACL_UNSAFE); - // Create/Delete/Write/Admin to creator - nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL); - return nodeAcls; - } - /** - * Get the ensemble server addresses from the configuration. The format is: host1:port, - * host2:port.. - * - * @param conf configuration - **/ - private static String getQuorumServers(Configuration conf) { - String[] hosts = conf.getTrimmedStrings(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname); - String port = conf.get(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname, - ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue()); - StringBuilder quorum = new StringBuilder(); - for (int i = 0; i < hosts.length; i++) { - quorum.append(hosts[i].trim()); - if (!hosts[i].contains(":")) { - // if the hostname doesn't contain a port, add the configured port to hostname - quorum.append(":"); - quorum.append(port); - } - - if (i != hosts.length - 1) { - quorum.append(","); - } - } - - return quorum.toString(); - } - - protected abstract String getZkPathUser(Configuration conf); - - protected final String registerServiceRecord(ServiceRecord srv) throws IOException { - return registerServiceRecord(srv, UNIQUE_ID.toString()); - } - - protected final String registerServiceRecord(ServiceRecord srv, final String uniqueId) throws IOException { - // restart sensitive instance id - srv.set(UNIQUE_IDENTIFIER, uniqueId); - - // Create a znode under the rootNamespace parent for this instance of the server - try { - // PersistentNode will make sure the ephemeral node created on server will be present - // even under connection or session interruption (will automatically handle retries) - znode = new PersistentNode(zooKeeperClient, CreateMode.EPHEMERAL_SEQUENTIAL, false, - workersPath + "/" + workerNodePrefix, encoder.toBytes(srv)); - - // start the creation of znodes - znode.start(); - - // We'll wait for 120s for node creation - long znodeCreationTimeout = 120; - if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) { - throw new Exception( - "Max znode creation wait time: " + znodeCreationTimeout + "s exhausted"); - } - - znodePath = znode.getActualPath(); - - if (doCheckAcls) { - try { - checkAndSetAcls(); - } catch (Exception ex) { - throw new IOException("Error validating or setting ACLs. " + disableMessage, ex); - } - } - if (zooKeeperClient.checkExists().forPath(znodePath) == null) { - // No node exists, throw exception - throw new Exception("Unable to create znode for this instance on ZooKeeper."); - } - } catch (Exception e) { - LOG.error("Unable to create a znode for this server instance", e); - CloseableUtils.closeQuietly(znode); - throw (e instanceof IOException) ? (IOException)e : new IOException(e); - } - return uniqueId; - } - - protected final void updateServiceRecord( - ServiceRecord srv, boolean doCheckAcls, boolean closeOnFailure) throws IOException { - if (srv.get(UNIQUE_IDENTIFIER) == null) { - srv.set(UNIQUE_IDENTIFIER, UNIQUE_ID.toString()); - } - // waitForInitialCreate must have already been called in registerServiceRecord. - try { - znode.setData(encoder.toBytes(srv)); - - if (doCheckAcls) { - try { - checkAndSetAcls(); - } catch (Exception ex) { - throw new IOException("Error validating or setting ACLs. " + disableMessage, ex); - } - } - } catch (Exception e) { - LOG.error("Unable to update znode with new service record", e); - if (closeOnFailure) { - CloseableUtils.closeQuietly(znode); - } - throw (e instanceof IOException) ? (IOException) e : new IOException(e); - } - } - - @VisibleForTesting - public String getPersistentNodePath() { - return "/" + PATH_JOINER.join(namespace, StringUtils.substringBetween(workersPath, "/", "/"), "pnode0"); - } - - protected void ensurePersistentNodePath(ServiceRecord srv) throws IOException { - String pNodePath = getPersistentNodePath(); - try { - LOG.info("Check if persistent node path {} exists, create if not", pNodePath); - if (zooKeeperClient.checkExists().forPath(pNodePath) == null) { - zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) - .forPath(pNodePath, encoder.toBytes(srv)); - LOG.info("Created persistent path at: {}", pNodePath); - } - } catch (Exception e) { - // throw exception if it is other than NODEEXISTS. - if (!(e instanceof KeeperException) || ((KeeperException) e).code() != KeeperException.Code.NODEEXISTS) { - LOG.error("Unable to create a persistent znode for this server instance", e); - throw new IOException(e); - } else { - LOG.debug("Ignoring KeeperException while ensuring path as the parent node {} already exists.", pNodePath); - } - } - } - - final protected void initializeWithoutRegisteringInternal() throws IOException { - // Create a znode under the rootNamespace parent for this instance of the server - try { - try { - zooKeeperClient.create().creatingParentsIfNeeded().forPath(workersPath); - } catch (NodeExistsException ex) { - // Ignore - this is expected. - } - if (doCheckAcls) { - try { - checkAndSetAcls(); - } catch (Exception ex) { - throw new IOException("Error validating or setting ACLs. " + disableMessage, ex); - } - } - } catch (Exception e) { - LOG.error("Unable to create a parent znode for the registry", e); - throw (e instanceof IOException) ? (IOException)e : new IOException(e); - } - } - - private void checkAndSetAcls() throws Exception { - if (!isZkEnforceSASLClient(conf)) { - return; - } - // We are trying to check ACLs on the "workers" directory, which noone except us should be - // able to write to. Higher-level directories shouldn't matter - we don't read them. - String pathToCheck = workersPath; - List acls = zooKeeperClient.getACL().forPath(pathToCheck); - if (acls == null || acls.isEmpty()) { - // Can there be no ACLs? There's some access (to get ACLs), so assume it means free for all. - LOG.warn("No ACLs on " + pathToCheck + "; setting up ACLs. " + disableMessage); - setUpAcls(pathToCheck); - return; - } - // This could be brittle. - assert userNameFromPrincipal != null; - Id currentUser = new Id("sasl", userNameFromPrincipal); - for (ACL acl : acls) { - if ((acl.getPerms() & ~ZooDefs.Perms.READ) == 0 || currentUser.equals(acl.getId())) { - continue; // Read permission/no permissions, or the expected user. - } - LOG.warn("The ACL " + acl + " is unnacceptable for " + pathToCheck - + "; setting up ACLs. " + disableMessage); - setUpAcls(pathToCheck); - return; - } - } - - private void setUpAcls(String path) throws Exception { - List acls = createSecureAcls(); - LinkedList paths = new LinkedList<>(); - paths.add(path); - while (!paths.isEmpty()) { - String currentPath = paths.poll(); - List children = zooKeeperClient.getChildren().forPath(currentPath); - if (children != null) { - for (String child : children) { - paths.add(currentPath + "/" + child); - } - } - zooKeeperClient.setACL().withACL(acls).forPath(currentPath); - } - } - - private void addToCache(String path, String host, InstanceType instance) { - instanceCacheLock.lock(); - try { - putInInstanceCache(path, pathToInstanceCache, instance); - putInNodeCache(host, nodeToInstanceCache, instance); - } finally { - instanceCacheLock.unlock(); - } - LOG.debug("Added path={}, host={} instance={} to cache." - + " pathToInstanceCache:size={}, nodeToInstanceCache:size={}", - path, host, instance, pathToInstanceCache.size(), nodeToInstanceCache.size()); - } - - private void removeFromCache(String path, String host) { - instanceCacheLock.lock(); - try { - pathToInstanceCache.remove(path); - nodeToInstanceCache.remove(host); - } finally { - instanceCacheLock.unlock(); - } - LOG.debug("Removed path={}, host={} from cache." - + " pathToInstanceCache:size={}, nodeToInstanceCache:size={}", - path, host, pathToInstanceCache.size(), nodeToInstanceCache.size()); - } - - private void putInInstanceCache(String key, Map cache, - InstanceType instance) { - cache.put(key, instance); - } - - private void putInNodeCache(String key, Map> cache, - InstanceType instance) { - Set instanceSet = cache.get(key); - if (instanceSet == null) { - instanceSet = new HashSet<>(); - instanceSet.add(instance); - } - cache.put(key, instanceSet); - } - - protected final void populateCache(PathChildrenCache instancesCache, boolean doInvokeListeners) { - for (ChildData childData : instancesCache.getCurrentData()) { - byte[] data = getWorkerData(childData, workerNodePrefix); - if (data == null) continue; - String nodeName = extractNodeName(childData); - if (!isLlapWorker(nodeName, workerNodePrefix)) continue; - int ephSeqVersion = extractSeqNum(nodeName); - try { - ServiceRecord srv = encoder.fromBytes(childData.getPath(), data); - InstanceType instance = createServiceInstance(srv); - addToCache(childData.getPath(), instance.getHost(), instance); - if (doInvokeListeners) { - for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { - listener.onCreate(instance, ephSeqVersion); - } - } - } catch (IOException e) { - LOG.error("Unable to decode data for zkpath: {}." + - " Ignoring from current instances list..", childData.getPath()); - } - } - } - - private static boolean isLlapWorker(String nodeName, String workerNodePrefix) { - return nodeName.startsWith(workerNodePrefix) && nodeName.length() > workerNodePrefix.length(); - } - - protected abstract InstanceType createServiceInstance(ServiceRecord srv) throws IOException; - - protected static byte[] getWorkerData(ChildData childData, String workerNodePrefix) { - if (childData == null) return null; - byte[] data = childData.getData(); - if (data == null) return null; - if (!isLlapWorker(extractNodeName(childData), workerNodePrefix)) return null; - return data; - } - - private class InstanceStateChangeListener implements PathChildrenCacheListener { - private final Logger LOG = LoggerFactory.getLogger(InstanceStateChangeListener.class); - - @Override - public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) { - Preconditions.checkArgument(client != null - && client.getState() == CuratorFrameworkState.STARTED, "client is not started"); - - synchronized (this) { - ChildData childData = event.getData(); - if (childData == null) return; - String nodeName = extractNodeName(childData); - if (nodeName.equals(workerNodePrefix)) { - LOG.warn("Invalid LLAP worker node name: {} was {}", childData.getPath(), event.getType()); - } - if (!isLlapWorker(nodeName, workerNodePrefix)) return; - LOG.info("{} for zknode {}", event.getType(), childData.getPath()); - InstanceType instance = extractServiceInstance(event, childData); - if (instance != null) { - int ephSeqVersion = extractSeqNum(nodeName); - switch (event.getType()) { - case CHILD_ADDED: - addToCache(childData.getPath(), instance.getHost(), instance); - for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { - listener.onCreate(instance, ephSeqVersion); - } - break; - case CHILD_UPDATED: - addToCache(childData.getPath(), instance.getHost(), instance); - for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { - listener.onUpdate(instance, ephSeqVersion); - } - break; - case CHILD_REMOVED: - removeFromCache(childData.getPath(), instance.getHost()); - for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { - listener.onRemove(instance, ephSeqVersion); - } - break; - default: - // Ignore all the other events; logged above. - } - } else { - LOG.info("instance is null for event: {} childData: {}", event.getType(), childData); - } - } - } - } - - // The real implementation for the instanceset... instanceset has its own copy of the - // ZK cache yet completely depends on the parent in every other aspect and is thus unneeded. - - protected final int sizeInternal() { - // not using the path child cache here as there could be more than 1 path per host (worker and slot znodes) - return nodeToInstanceCache.size(); - } - - protected final Set getByHostInternal(String host) { - Set byHost = nodeToInstanceCache.get(host); - byHost = (byHost == null) ? Sets.newHashSet() : byHost; - if (LOG.isDebugEnabled()) { - LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host); - } - return byHost; - } - - protected final Collection getAllInternal() { - return new HashSet<>(pathToInstanceCache.values()); - } - - private static String extractNodeName(ChildData childData) { - String nodeName = childData.getPath(); - int ix = nodeName.lastIndexOf("/"); - if (ix >= 0) { - nodeName = nodeName.substring(ix + 1); - } - return nodeName; - } - - private InstanceType extractServiceInstance( - PathChildrenCacheEvent event, ChildData childData) { - byte[] data = childData.getData(); - if (data == null) return null; - try { - ServiceRecord srv = encoder.fromBytes(event.getData().getPath(), data); - return createServiceInstance(srv); - } catch (IOException e) { - LOG.error("Unable to decode data for zknode: {}." + - " Dropping notification of type: {}", childData.getPath(), event.getType()); - return null; - } - } - - public synchronized void registerStateChangeListener( - ServiceInstanceStateChangeListener listener) throws IOException { - ensureInstancesCache(0); - this.stateChangeListeners.add(listener); - } - - @SuppressWarnings("resource") // Bogus warnings despite closeQuietly. - protected final synchronized PathChildrenCache ensureInstancesCache( - long clusterReadyTimeoutMs) throws IOException { - Preconditions.checkArgument(zooKeeperClient != null && - zooKeeperClient.getState() == CuratorFrameworkState.STARTED, "client is not started"); - // lazily create PathChildrenCache - PathChildrenCache instancesCache = this.instancesCache; - if (instancesCache != null) return instancesCache; - ExecutorService tp = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() - .setDaemon(true).setNameFormat("StateChangeNotificationHandler").build()); - long startTimeNs = System.nanoTime(), deltaNs = clusterReadyTimeoutMs * 1000000L; - long sleepTimeMs = Math.min(16, clusterReadyTimeoutMs); - while (true) { - instancesCache = new PathChildrenCache(zooKeeperClient, workersPath, true); - instancesCache.getListenable().addListener(new InstanceStateChangeListener(), tp); - try { - instancesCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); - this.instancesCache = instancesCache; - return instancesCache; - } catch (InvalidACLException e) { - // PathChildrenCache tried to mkdir when the znode wasn't there, and failed. - CloseableUtils.closeQuietly(instancesCache); - long elapsedNs = System.nanoTime() - startTimeNs; - if (deltaNs == 0 || deltaNs <= elapsedNs) { - LOG.error("Unable to start curator PathChildrenCache", e); - throw new IOException(e); - } - LOG.warn("The cluster is not started yet (InvalidACL); will retry"); - try { - Thread.sleep(Math.min(sleepTimeMs, (deltaNs - elapsedNs)/1000000L)); - } catch (InterruptedException e1) { - LOG.error("Interrupted while retrying the PathChildrenCache startup"); - throw new IOException(e1); - } - sleepTimeMs = sleepTimeMs << 1; - } catch (Exception e) { - CloseableUtils.closeQuietly(instancesCache); - LOG.error("Unable to start curator PathChildrenCache", e); - throw new IOException(e); - } - } + * Override to use HiveConf for checking ZooKeeper SASL enforcement. + */ + @Override + protected boolean isZkEnforceSASLClient(Configuration conf) { + return UserGroupInformation.isSecurityEnabled() && + HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ZOOKEEPER_USE_KERBEROS); } + /** + * Override to use LlapUtil for extracting username from principal. + */ + @Override public void start() throws IOException { if (zooKeeperClient != null) { if (isZkEnforceSASLClient(conf)) { @@ -676,62 +119,11 @@ public void start() throws IOException { SecurityUtils.setZookeeperClientKerberosJaasConfig(zkPrincipal, zkKeytab, saslLoginContextName); } if (zkPrincipal != null) { + // Use LlapUtil for Hive components userNameFromPrincipal = LlapUtil.getUserNameFromPrincipal(zkPrincipal); } } zooKeeperClient.start(); } - // Init closeable utils in case register is not called (see HIVE-13322) - CloseableUtils.class.getName(); - } - - private static boolean isZkEnforceSASLClient(Configuration conf) { - return UserGroupInformation.isSecurityEnabled() && - HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ZOOKEEPER_USE_KERBEROS); - } - - protected void unregisterInternal() { - CloseableUtils.closeQuietly(znode); - } - - public void stop() { - CloseableUtils.closeQuietly(znode); - CloseableUtils.closeQuietly(instancesCache); - CloseableUtils.closeQuietly(zooKeeperClient); - } - - protected final InstanceType getInstanceByPath(String path) { - return pathToInstanceCache.get(path); - } - - protected final String getRegistrationZnodePath() { - return znodePath; - } - - private int extractSeqNum(String nodeName) { - // Extract the sequence number of this ephemeral-sequential znode. - String ephSeqVersionStr = nodeName.substring(workerNodePrefix.length()); - try { - return Integer.parseInt(ephSeqVersionStr); - } catch (NumberFormatException e) { - LOG.error("Cannot parse " + ephSeqVersionStr + " from " + nodeName, e); - throw e; - } - } - - // for debugging - private class ZkConnectionStateListener implements ConnectionStateListener { - @Override - public void stateChanged(final CuratorFramework curatorFramework, final ConnectionState connectionState) { - LOG.info("Connection state change notification received. State: {}", connectionState); - } - } - - public String currentUser() { - try { - return UserGroupInformation.getCurrentUser().getShortUserName(); - } catch (IOException e) { - throw new RuntimeException(e); - } } } diff --git a/standalone-metastore/metastore-common/pom.xml b/standalone-metastore/metastore-common/pom.xml index 640e68d8ad18..08144b392a0e 100644 --- a/standalone-metastore/metastore-common/pom.xml +++ b/standalone-metastore/metastore-common/pom.xml @@ -235,6 +235,16 @@ + + org.apache.hadoop + hadoop-yarn-registry + ${hadoop.version} + + + org.apache.hadoop + hadoop-yarn-api + ${hadoop.version} + org.apache.hive hive-storage-api diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java index c7da6259f53a..64e09dfe7900 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java @@ -228,11 +228,30 @@ public static ZooKeeperHiveHelper.ZooKeeperHiveHelperBuilder builder() { public ZooKeeperHiveHelper(ZooKeeperHiveHelperBuilder builder) { // Get the ensemble server addresses in the format host1:port1, host2:port2, ... . Append // the configured port to hostname if the hostname doesn't contain a port. - String[] hosts = builder.getQuorum().split(","); + String quorumInput = builder.getQuorum(); + if (quorumInput == null || quorumInput.trim().isEmpty()) { + throw new IllegalArgumentException("ZooKeeper quorum cannot be null or empty"); + } + // Clean up any protocol prefixes (thrift://, //, etc.) + quorumInput = quorumInput.trim(); + if (quorumInput.startsWith("//")) { + quorumInput = quorumInput.substring(2); + } + int protocolIndex = quorumInput.indexOf("://"); + if (protocolIndex >= 0) { + quorumInput = quorumInput.substring(protocolIndex + 3); + } + + String[] hosts = quorumInput.split(","); StringBuilder quorumServers = new StringBuilder(); for (int i = 0; i < hosts.length; i++) { - quorumServers.append(hosts[i].trim()); - if (!hosts[i].contains(":")) { + String host = hosts[i].trim(); + // Clean up any remaining protocol artifacts + if (host.startsWith("//")) { + host = host.substring(2); + } + quorumServers.append(host); + if (!host.contains(":")) { quorumServers.append(":"); quorumServers.append(builder.getClientPort()); } diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 85b37d67889d..69432c0ad8ed 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -1942,6 +1942,23 @@ public enum ConfVars { "hive.metastore.iceberg.catalog.cache.expiry", -1, "HMS Iceberg Catalog cache expiry." ), + REST_CATALOG_HA_ENABLED("metastore.rest.catalog.ha.enabled", + "hive.metastore.rest.catalog.ha.enabled", false, + "Whether REST Catalog supports High Availability with ZooKeeper service discovery." + ), + REST_CATALOG_HA_REGISTRY_NAMESPACE("metastore.rest.catalog.ha.registry.namespace", + "hive.metastore.rest.catalog.ha.registry.namespace", "restCatalogHA", + "The parent node in ZooKeeper used by REST Catalog when supporting HA." + ), + REST_CATALOG_HA_MODE("metastore.rest.catalog.ha.mode", + "hive.metastore.rest.catalog.ha.mode", "active-passive", + new StringSetValidator("active-passive", "active-active"), + "REST Catalog HA mode: 'active-passive' for leader election, 'active-active' for load balancing." + ), + REST_CATALOG_INSTANCE_URI("metastore.rest.catalog.instance.uri", + "hive.metastore.rest.catalog.instance.uri", "", + "REST Catalog instance URI (host:port) for service discovery registration." + ), HTTPSERVER_THREADPOOL_MIN("hive.metastore.httpserver.threadpool.min", "hive.metastore.httpserver.threadpool.min", 8, "HMS embedded HTTP server minimum number of threads." diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/registry/impl/ZkRegistryBase.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/registry/impl/ZkRegistryBase.java new file mode 100644 index 000000000000..75da88ab4249 --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/registry/impl/ZkRegistryBase.java @@ -0,0 +1,784 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.registry.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.framework.recipes.nodes.PersistentNode; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.CloseableUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.ZooKeeperHiveHelper; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.utils.SecurityUtils; +import org.apache.hadoop.hive.registry.RegistryUtilities; +import org.apache.hadoop.hive.registry.ServiceInstance; +import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.InvalidACLException; +import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * This is currently used for implementation inheritance only; it doesn't provide a unified flow + * into which one can just plug a few abstract method implementations, because providing one with + * getInstance method is a huge pain involving lots of generics. Also, different registries may + * have slightly different usage patterns anyway and noone would use a registry without knowing + * what type it is. So, it's mostly a grab bag of methods used by ServiceInstanceSet and other + * parts of each implementation. + * + * Adapted to use MetastoreConf instead of HiveConf for HMS compatibility. + */ +public abstract class ZkRegistryBase { + private static final Logger LOG = LoggerFactory.getLogger(ZkRegistryBase.class); + private final static String SASL_NAMESPACE = "sasl"; + private final static String UNSECURE_NAMESPACE = "unsecure"; + protected final static String USER_SCOPE_PATH_PREFIX = "user-"; + protected static final String WORKER_PREFIX = "worker-"; + protected static final String WORKER_GROUP = "workers"; + public static final String UNIQUE_IDENTIFIER = "registry.unique.id"; + protected static final UUID UNIQUE_ID = UUID.randomUUID(); + private static final Joiner PATH_JOINER = Joiner.on("/").skipNulls(); + + protected final Configuration conf; + protected final CuratorFramework zooKeeperClient; + // workersPath is the directory path where all the worker znodes are located. + protected final String workersPath; + private final String workerNodePrefix; + + protected final ServiceRecordMarshal encoder; // to marshal/unmarshal znode data + + private final Set> stateChangeListeners; + + protected final boolean doCheckAcls; + // Secure ZK is only set up by the registering service; anyone can read the registrations. + protected final String zkPrincipal, zkKeytab, saslLoginContextName; + protected String userNameFromPrincipal; // Only set when setting up the secure config for ZK. + protected final String disableMessage; + + private final Lock instanceCacheLock = new ReentrantLock(); + // there can be only one instance per path + private final Map pathToInstanceCache; + // there can be multiple instances per node + private final Map> nodeToInstanceCache; + + // The registration znode. + private PersistentNode znode; + private String znodePath; // unique identity for this instance + + final String namespace; + + private PathChildrenCache instancesCache; // Created on demand. + + /** Local hostname. */ + protected static final String hostname = RegistryUtilities.getCanonicalHostName(); + + /** + * @param rootNs A single root namespace override. Not recommended. + * @param nsPrefix The namespace prefix to use with default namespaces (appends 'sasl' for secure else 'unsecure' + * to namespace prefix to get effective root namespace). + * @param userScopePathPrefix The prefix to use for the user-specific part of the path. + * @param workerPrefix The prefix to use for each worker znode. + * @param workerGroup group name to use for all workers + * @param zkSaslLoginContextName SASL login context name for ZK security; null if not needed. + * @param zkPrincipal ZK security principal. + * @param zkKeytab ZK security keytab. + * @param aclsConfig A config setting to use to determine if ACLs should be verified. Can be null to default to true. + */ + public ZkRegistryBase(String instanceName, Configuration conf, String rootNs, String nsPrefix, + String userScopePathPrefix, String workerPrefix, String workerGroup, + String zkSaslLoginContextName, String zkPrincipal, String zkKeytab, ConfVars aclsConfig) { + this.conf = new Configuration(conf); + this.saslLoginContextName = zkSaslLoginContextName; + this.zkPrincipal = zkPrincipal; + this.zkKeytab = zkKeytab; + if (aclsConfig != null) { + this.doCheckAcls = MetastoreConf.getBoolVar(conf, aclsConfig); + this.disableMessage = "Set " + aclsConfig.getVarname() + " to false to disable ACL validation"; + } else { + this.doCheckAcls = true; + this.disableMessage = ""; + } + this.conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); + this.encoder = new RegistryUtils.ServiceRecordMarshal(); + + // sample path: /llap-sasl/hiveuser/hostname/workers/worker-0000000 + // worker-0000000 is the sequence number which will be retained until session timeout. If a + // worker does not respond due to communication interruptions it will retain the same sequence + // number when it returns back. If session timeout expires, the node will be deleted and new + // addition of the same node (restart) will get next sequence number + final String userPathPrefix = userScopePathPrefix == null ? null : userScopePathPrefix + getZkPathUser(conf); + this.workerNodePrefix = workerPrefix == null ? WORKER_PREFIX : workerPrefix; + this.workersPath = "/" + PATH_JOINER.join(userPathPrefix, instanceName, workerGroup); + this.instancesCache = null; + this.stateChangeListeners = new HashSet<>(); + this.pathToInstanceCache = new ConcurrentHashMap<>(); + this.nodeToInstanceCache = new ConcurrentHashMap<>(); + this.namespace = getRootNamespaceInternal(conf, rootNs, nsPrefix); + ACLProvider aclProvider; + // get acl provider for most outer path that is non-null + if (userPathPrefix == null) { + if (instanceName == null) { + if (workerGroup == null) { + aclProvider = getACLProviderForZKPath(namespace); + } else { + aclProvider = getACLProviderForZKPath(workerGroup); + } + } else { + aclProvider = getACLProviderForZKPath(instanceName); + } + } else { + aclProvider = getACLProviderForZKPath(userScopePathPrefix); + } + this.zooKeeperClient = getZookeeperClient(conf, namespace, aclProvider); + this.zooKeeperClient.getConnectionStateListenable().addListener(new ZkConnectionStateListener()); + } + + public static String getRootNamespace(Configuration conf, String userProvidedNamespace, + String defaultNamespacePrefix) { + // Use static helper to check security - this is called from static contexts + final boolean isSecure = UserGroupInformation.isSecurityEnabled() && + MetastoreConf.getBoolVar(conf, ConfVars.THRIFT_ZOOKEEPER_USE_KERBEROS); + String rootNs = userProvidedNamespace; + if (rootNs == null) { + rootNs = defaultNamespacePrefix + (isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE); + } + return rootNs; + } + + protected String getRootNamespaceInternal(Configuration conf, String userProvidedNamespace, + String defaultNamespacePrefix) { + final boolean isSecure = isZkEnforceSASLClient(conf); + String rootNs = userProvidedNamespace; + if (rootNs == null) { + rootNs = defaultNamespacePrefix + (isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE); + } + return rootNs; + } + + private ACLProvider getACLProviderForZKPath(String zkPath) { + final boolean isSecure = isZkEnforceSASLClient(conf); + return new ACLProvider() { + @Override + public List getDefaultAcl() { + // We always return something from getAclForPath so this should not happen. + LOG.warn("getDefaultAcl was called"); + return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE); + } + + @Override + public List getAclForPath(String path) { + if (!isSecure || path == null || !path.contains(zkPath)) { + // No security or the path is below the user path - full access. + return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE); + } + return createSecureAcls(); + } + }; + } + + protected CuratorFramework getZookeeperClient(Configuration conf, String namespace, ACLProvider zooKeeperAclProvider) { + String keyStorePassword = ""; + String trustStorePassword = ""; + if (MetastoreConf.getBoolVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_ENABLE)) { + try { + keyStorePassword = MetastoreConf.getPassword(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_PASSWORD); + trustStorePassword = MetastoreConf.getPassword(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD); + } catch (IOException e) { + throw new RuntimeException("Failed to read zookeeper conf passwords", e); + } + } + // Prefer hive.zookeeper.quorum for ZooKeeper connection (explicit ZooKeeper config) + // Fallback to THRIFT_URIS only if hive.zookeeper.quorum is not set + // This allows REST Catalog HA to use ZooKeeper independently from HMS URIs + String quorum = conf.get("hive.zookeeper.quorum"); + if (quorum == null || quorum.trim().isEmpty()) { + // Fallback: use THRIFT_URIS for ZooKeeper quorum (as in MetastoreConf.getZKConfig) + // This maintains backward compatibility for HMS use cases + quorum = MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS); + // If THRIFT_URIS contains "thrift://" prefix, strip it as it's not valid for ZooKeeper + if (quorum != null && quorum.startsWith("thrift://")) { + quorum = quorum.substring("thrift://".length()); + } + } + // Clean up any remaining protocol prefixes or double slashes + if (quorum != null) { + quorum = quorum.trim(); + // Remove any remaining protocol prefixes + if (quorum.startsWith("//")) { + quorum = quorum.substring(2); + } + // Remove any other protocol prefixes that might have been missed + int protocolIndex = quorum.indexOf("://"); + if (protocolIndex >= 0) { + quorum = quorum.substring(protocolIndex + 3); + } + } + if (quorum == null || quorum.isEmpty()) { + throw new IllegalArgumentException("ZooKeeper quorum is not configured. " + + "Set hive.zookeeper.quorum or ensure THRIFT_URIS contains valid ZooKeeper addresses."); + } + return ZooKeeperHiveHelper.builder() + .quorum(quorum) + .clientPort(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_CLIENT_PORT)) + .connectionTimeout((int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) + .sessionTimeout((int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS)) + .baseSleepTime((int) MetastoreConf.getTimeVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS)) + .maxRetries(MetastoreConf.getIntVar(conf, ConfVars.THRIFT_ZOOKEEPER_CONNECTION_MAX_RETRIES)) + .sslEnabled(MetastoreConf.getBoolVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_ENABLE)) + .keyStoreLocation(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_LOCATION)) + .keyStorePassword(keyStorePassword) + .keyStoreType(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_KEYSTORE_TYPE)) + .trustStoreLocation(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION)) + .trustStorePassword(trustStorePassword) + .trustStoreType(MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_SSL_TRUSTSTORE_TYPE)) + .build().getNewZookeeperClient(zooKeeperAclProvider, namespace); + } + + private static List createSecureAcls() { + // Read all to the world + List nodeAcls = new ArrayList<>(ZooDefs.Ids.READ_ACL_UNSAFE); + // Create/Delete/Write/Admin to creator + nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL); + return nodeAcls; + } + + /** + * Get the ensemble server addresses from the configuration. The format is: host1:port, + * host2:port.. + * + * @param conf configuration + **/ + private static String getQuorumServers(Configuration conf) { + // Prefer hive.zookeeper.quorum for ZooKeeper connection + String quorum = conf.get("hive.zookeeper.quorum", ""); + if (quorum == null || quorum.isEmpty()) { + quorum = MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS); + // If THRIFT_URIS contains "thrift://" prefix, strip it as it's not valid for ZooKeeper + if (quorum != null && quorum.startsWith("thrift://")) { + quorum = quorum.substring("thrift://".length()); + } + } + String[] hosts = quorum.split(","); + String port = MetastoreConf.getVar(conf, ConfVars.THRIFT_ZOOKEEPER_CLIENT_PORT); + StringBuilder quorumBuilder = new StringBuilder(); + for (int i = 0; i < hosts.length; i++) { + quorumBuilder.append(hosts[i].trim()); + if (!hosts[i].contains(":")) { + // if the hostname doesn't contain a port, add the configured port to hostname + quorumBuilder.append(":"); + quorumBuilder.append(port); + } + + if (i != hosts.length - 1) { + quorumBuilder.append(","); + } + } + + return quorumBuilder.toString(); + } + + protected abstract String getZkPathUser(Configuration conf); + + protected final String registerServiceRecord(ServiceRecord srv) throws IOException { + return registerServiceRecord(srv, UNIQUE_ID.toString()); + } + + protected final String registerServiceRecord(ServiceRecord srv, final String uniqueId) throws IOException { + // restart sensitive instance id + srv.set(UNIQUE_IDENTIFIER, uniqueId); + + // Create a znode under the rootNamespace parent for this instance of the server + try { + // PersistentNode will make sure the ephemeral node created on server will be present + // even under connection or session interruption (will automatically handle retries) + znode = new PersistentNode(zooKeeperClient, CreateMode.EPHEMERAL_SEQUENTIAL, false, + workersPath + "/" + workerNodePrefix, encoder.toBytes(srv)); + + // start the creation of znodes + znode.start(); + + // We'll wait for 120s for node creation + long znodeCreationTimeout = 120; + if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) { + throw new Exception( + "Max znode creation wait time: " + znodeCreationTimeout + "s exhausted"); + } + + znodePath = znode.getActualPath(); + + if (doCheckAcls) { + try { + checkAndSetAcls(); + } catch (Exception ex) { + throw new IOException("Error validating or setting ACLs. " + disableMessage, ex); + } + } + if (zooKeeperClient.checkExists().forPath(znodePath) == null) { + // No node exists, throw exception + throw new Exception("Unable to create znode for this instance on ZooKeeper."); + } + } catch (Exception e) { + LOG.error("Unable to create a znode for this server instance", e); + CloseableUtils.closeQuietly(znode); + throw (e instanceof IOException) ? (IOException)e : new IOException(e); + } + return uniqueId; + } + + protected final void updateServiceRecord( + ServiceRecord srv, boolean doCheckAcls, boolean closeOnFailure) throws IOException { + if (srv.get(UNIQUE_IDENTIFIER) == null) { + srv.set(UNIQUE_IDENTIFIER, UNIQUE_ID.toString()); + } + // waitForInitialCreate must have already been called in registerServiceRecord. + try { + znode.setData(encoder.toBytes(srv)); + + if (doCheckAcls) { + try { + checkAndSetAcls(); + } catch (Exception ex) { + throw new IOException("Error validating or setting ACLs. " + disableMessage, ex); + } + } + } catch (Exception e) { + LOG.error("Unable to update znode with new service record", e); + if (closeOnFailure) { + CloseableUtils.closeQuietly(znode); + } + throw (e instanceof IOException) ? (IOException) e : new IOException(e); + } + } + + @VisibleForTesting + public String getPersistentNodePath() { + return "/" + PATH_JOINER.join(namespace, StringUtils.substringBetween(workersPath, "/", "/"), "pnode0"); + } + + protected void ensurePersistentNodePath(ServiceRecord srv) throws IOException { + String pNodePath = getPersistentNodePath(); + try { + LOG.info("Check if persistent node path {} exists, create if not", pNodePath); + if (zooKeeperClient.checkExists().forPath(pNodePath) == null) { + zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) + .forPath(pNodePath, encoder.toBytes(srv)); + LOG.info("Created persistent path at: {}", pNodePath); + } + } catch (Exception e) { + // throw exception if it is other than NODEEXISTS. + if (!(e instanceof KeeperException) || ((KeeperException) e).code() != KeeperException.Code.NODEEXISTS) { + LOG.error("Unable to create a persistent znode for this server instance", e); + throw new IOException(e); + } else { + LOG.debug("Ignoring KeeperException while ensuring path as the parent node {} already exists.", pNodePath); + } + } + } + + final protected void initializeWithoutRegisteringInternal() throws IOException { + // Create a znode under the rootNamespace parent for this instance of the server + try { + try { + zooKeeperClient.create().creatingParentsIfNeeded().forPath(workersPath); + } catch (NodeExistsException ex) { + // Ignore - this is expected. + } + if (doCheckAcls) { + try { + checkAndSetAcls(); + } catch (Exception ex) { + throw new IOException("Error validating or setting ACLs. " + disableMessage, ex); + } + } + } catch (Exception e) { + LOG.error("Unable to create a parent znode for the registry", e); + throw (e instanceof IOException) ? (IOException)e : new IOException(e); + } + } + + private void checkAndSetAcls() throws Exception { + if (!isZkEnforceSASLClient(conf)) { + return; + } + // We are trying to check ACLs on the "workers" directory, which noone except us should be + // able to write to. Higher-level directories shouldn't matter - we don't read them. + String pathToCheck = workersPath; + List acls = zooKeeperClient.getACL().forPath(pathToCheck); + if (acls == null || acls.isEmpty()) { + // Can there be no ACLs? There's some access (to get ACLs), so assume it means free for all. + LOG.warn("No ACLs on " + pathToCheck + "; setting up ACLs. " + disableMessage); + setUpAcls(pathToCheck); + return; + } + // This could be brittle. + assert userNameFromPrincipal != null; + Id currentUser = new Id("sasl", userNameFromPrincipal); + for (ACL acl : acls) { + if ((acl.getPerms() & ~ZooDefs.Perms.READ) == 0 || currentUser.equals(acl.getId())) { + continue; // Read permission/no permissions, or the expected user. + } + LOG.warn("The ACL " + acl + " is unnacceptable for " + pathToCheck + + "; setting up ACLs. " + disableMessage); + setUpAcls(pathToCheck); + return; + } + } + + private void setUpAcls(String path) throws Exception { + List acls = createSecureAcls(); + LinkedList paths = new LinkedList<>(); + paths.add(path); + while (!paths.isEmpty()) { + String currentPath = paths.poll(); + List children = zooKeeperClient.getChildren().forPath(currentPath); + if (children != null) { + for (String child : children) { + paths.add(currentPath + "/" + child); + } + } + zooKeeperClient.setACL().withACL(acls).forPath(currentPath); + } + } + + private void addToCache(String path, String host, InstanceType instance) { + instanceCacheLock.lock(); + try { + putInInstanceCache(path, pathToInstanceCache, instance); + putInNodeCache(host, nodeToInstanceCache, instance); + } finally { + instanceCacheLock.unlock(); + } + LOG.debug("Added path={}, host={} instance={} to cache." + + " pathToInstanceCache:size={}, nodeToInstanceCache:size={}", + path, host, instance, pathToInstanceCache.size(), nodeToInstanceCache.size()); + } + + private void removeFromCache(String path, String host) { + instanceCacheLock.lock(); + try { + pathToInstanceCache.remove(path); + nodeToInstanceCache.remove(host); + } finally { + instanceCacheLock.unlock(); + } + LOG.debug("Removed path={}, host={} from cache." + + " pathToInstanceCache:size={}, nodeToInstanceCache:size={}", + path, host, pathToInstanceCache.size(), nodeToInstanceCache.size()); + } + + private void putInInstanceCache(String key, Map cache, + InstanceType instance) { + cache.put(key, instance); + } + + private void putInNodeCache(String key, Map> cache, + InstanceType instance) { + Set instanceSet = cache.get(key); + if (instanceSet == null) { + instanceSet = new HashSet<>(); + instanceSet.add(instance); + } + cache.put(key, instanceSet); + } + + protected final void populateCache(PathChildrenCache instancesCache, boolean doInvokeListeners) { + for (ChildData childData : instancesCache.getCurrentData()) { + byte[] data = getWorkerData(childData, workerNodePrefix); + if (data == null) continue; + String nodeName = extractNodeName(childData); + if (!isLlapWorker(nodeName, workerNodePrefix)) continue; + int ephSeqVersion = extractSeqNum(nodeName); + try { + ServiceRecord srv = encoder.fromBytes(childData.getPath(), data); + InstanceType instance = createServiceInstance(srv); + addToCache(childData.getPath(), instance.getHost(), instance); + if (doInvokeListeners) { + for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { + listener.onCreate(instance, ephSeqVersion); + } + } + } catch (IOException e) { + LOG.error("Unable to decode data for zkpath: {}." + + " Ignoring from current instances list..", childData.getPath()); + } + } + } + + private static boolean isLlapWorker(String nodeName, String workerNodePrefix) { + return nodeName.startsWith(workerNodePrefix) && nodeName.length() > workerNodePrefix.length(); + } + + protected abstract InstanceType createServiceInstance(ServiceRecord srv) throws IOException; + + protected static byte[] getWorkerData(ChildData childData, String workerNodePrefix) { + if (childData == null) return null; + byte[] data = childData.getData(); + if (data == null) return null; + if (!isLlapWorker(extractNodeName(childData), workerNodePrefix)) return null; + return data; + } + + private class InstanceStateChangeListener implements PathChildrenCacheListener { + private final Logger LOG = LoggerFactory.getLogger(InstanceStateChangeListener.class); + + @Override + public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) { + Preconditions.checkArgument(client != null + && client.getState() == CuratorFrameworkState.STARTED, "client is not started"); + + synchronized (this) { + ChildData childData = event.getData(); + if (childData == null) return; + String nodeName = extractNodeName(childData); + if (nodeName.equals(workerNodePrefix)) { + LOG.warn("Invalid LLAP worker node name: {} was {}", childData.getPath(), event.getType()); + } + if (!isLlapWorker(nodeName, workerNodePrefix)) return; + LOG.info("{} for zknode {}", event.getType(), childData.getPath()); + InstanceType instance = extractServiceInstance(event, childData); + if (instance != null) { + int ephSeqVersion = extractSeqNum(nodeName); + switch (event.getType()) { + case CHILD_ADDED: + addToCache(childData.getPath(), instance.getHost(), instance); + for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { + listener.onCreate(instance, ephSeqVersion); + } + break; + case CHILD_UPDATED: + addToCache(childData.getPath(), instance.getHost(), instance); + for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { + listener.onUpdate(instance, ephSeqVersion); + } + break; + case CHILD_REMOVED: + removeFromCache(childData.getPath(), instance.getHost()); + for (ServiceInstanceStateChangeListener listener : stateChangeListeners) { + listener.onRemove(instance, ephSeqVersion); + } + break; + default: + // Ignore all the other events; logged above. + } + } else { + LOG.info("instance is null for event: {} childData: {}", event.getType(), childData); + } + } + } + } + + // The real implementation for the instanceset... instanceset has its own copy of the + // ZK cache yet completely depends on the parent in every other aspect and is thus unneeded. + + protected final int sizeInternal() { + // not using the path child cache here as there could be more than 1 path per host (worker and slot znodes) + return nodeToInstanceCache.size(); + } + + protected final Set getByHostInternal(String host) { + Set byHost = nodeToInstanceCache.get(host); + byHost = (byHost == null) ? Sets.newHashSet() : byHost; + if (LOG.isDebugEnabled()) { + LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host); + } + return byHost; + } + + protected final Collection getAllInternal() { + return new HashSet<>(pathToInstanceCache.values()); + } + + private static String extractNodeName(ChildData childData) { + String nodeName = childData.getPath(); + int ix = nodeName.lastIndexOf("/"); + if (ix >= 0) { + nodeName = nodeName.substring(ix + 1); + } + return nodeName; + } + + private InstanceType extractServiceInstance( + PathChildrenCacheEvent event, ChildData childData) { + byte[] data = childData.getData(); + if (data == null) return null; + try { + ServiceRecord srv = encoder.fromBytes(event.getData().getPath(), data); + return createServiceInstance(srv); + } catch (IOException e) { + LOG.error("Unable to decode data for zknode: {}." + + " Dropping notification of type: {}", childData.getPath(), event.getType()); + return null; + } + } + + public synchronized void registerStateChangeListener( + ServiceInstanceStateChangeListener listener) throws IOException { + ensureInstancesCache(0); + this.stateChangeListeners.add(listener); + } + + @SuppressWarnings("resource") // Bogus warnings despite closeQuietly. + protected final synchronized PathChildrenCache ensureInstancesCache( + long clusterReadyTimeoutMs) throws IOException { + Preconditions.checkArgument(zooKeeperClient != null && + zooKeeperClient.getState() == CuratorFrameworkState.STARTED, "client is not started"); + // lazily create PathChildrenCache + PathChildrenCache instancesCache = this.instancesCache; + if (instancesCache != null) return instancesCache; + ExecutorService tp = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("StateChangeNotificationHandler").build()); + long startTimeNs = System.nanoTime(), deltaNs = clusterReadyTimeoutMs * 1000000L; + long sleepTimeMs = Math.min(16, clusterReadyTimeoutMs); + while (true) { + instancesCache = new PathChildrenCache(zooKeeperClient, workersPath, true); + instancesCache.getListenable().addListener(new InstanceStateChangeListener(), tp); + try { + instancesCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); + this.instancesCache = instancesCache; + return instancesCache; + } catch (InvalidACLException e) { + // PathChildrenCache tried to mkdir when the znode wasn't there, and failed. + CloseableUtils.closeQuietly(instancesCache); + long elapsedNs = System.nanoTime() - startTimeNs; + if (deltaNs == 0 || deltaNs <= elapsedNs) { + LOG.error("Unable to start curator PathChildrenCache", e); + throw new IOException(e); + } + LOG.warn("The cluster is not started yet (InvalidACL); will retry"); + try { + Thread.sleep(Math.min(sleepTimeMs, (deltaNs - elapsedNs)/1000000L)); + } catch (InterruptedException e1) { + LOG.error("Interrupted while retrying the PathChildrenCache startup"); + throw new IOException(e1); + } + sleepTimeMs = sleepTimeMs << 1; + } catch (Exception e) { + CloseableUtils.closeQuietly(instancesCache); + LOG.error("Unable to start curator PathChildrenCache", e); + throw new IOException(e); + } + } + } + + public void start() throws IOException { + if (zooKeeperClient != null) { + if (isZkEnforceSASLClient(conf)) { + if (saslLoginContextName != null) { + SecurityUtils.setZookeeperClientKerberosJaasConfig(zkPrincipal, zkKeytab, saslLoginContextName); + } + if (zkPrincipal != null) { + userNameFromPrincipal = RegistryUtilities.getUserNameFromPrincipal(zkPrincipal); + } + } + zooKeeperClient.start(); + } + // Init closeable utils in case register is not called (see HIVE-13322) + CloseableUtils.class.getName(); + } + + protected boolean isZkEnforceSASLClient(Configuration conf) { + return UserGroupInformation.isSecurityEnabled() && + MetastoreConf.getBoolVar(conf, ConfVars.THRIFT_ZOOKEEPER_USE_KERBEROS); + } + + protected void unregisterInternal() { + CloseableUtils.closeQuietly(znode); + } + + public void stop() { + CloseableUtils.closeQuietly(znode); + CloseableUtils.closeQuietly(instancesCache); + CloseableUtils.closeQuietly(zooKeeperClient); + } + + protected final InstanceType getInstanceByPath(String path) { + return pathToInstanceCache.get(path); + } + + protected final String getRegistrationZnodePath() { + return znodePath; + } + + private int extractSeqNum(String nodeName) { + // Extract the sequence number of this ephemeral-sequential znode. + String ephSeqVersionStr = nodeName.substring(workerNodePrefix.length()); + try { + return Integer.parseInt(ephSeqVersionStr); + } catch (NumberFormatException e) { + LOG.error("Cannot parse " + ephSeqVersionStr + " from " + nodeName, e); + throw e; + } + } + + // for debugging + private class ZkConnectionStateListener implements ConnectionStateListener { + @Override + public void stateChanged(final CuratorFramework curatorFramework, final ConnectionState connectionState) { + LOG.info("Connection state change notification received. State: {}", connectionState); + } + } + + public String currentUser() { + try { + return UserGroupInformation.getCurrentUser().getShortUserName(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} + + + diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/registry/RegistryUtilities.java similarity index 75% rename from llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java rename to standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/registry/RegistryUtilities.java index e069e4349097..fbcaa838a878 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/registry/RegistryUtilities.java @@ -49,4 +49,25 @@ public static String getCanonicalHostName() { public static String getUUID() { return String.valueOf(UUID.randomUUID()); } -} \ No newline at end of file + + /** + * Extract username from Kerberos principal. + * Example: "user@REALM" -> "user" + * + * @param principal Kerberos principal + * @return username part of principal + */ + public static String getUserNameFromPrincipal(String principal) { + if (principal == null || principal.isEmpty()) { + return null; + } + int atIndex = principal.indexOf('@'); + if (atIndex > 0) { + return principal.substring(0, atIndex); + } + return principal; + } +} + + + diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/registry/ServiceInstance.java similarity index 99% rename from llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java rename to standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/registry/ServiceInstance.java index 4493e997c2e2..2636440e48d1 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/registry/ServiceInstance.java @@ -44,4 +44,7 @@ public interface ServiceInstance { */ Map getProperties(); -} \ No newline at end of file +} + + + diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java similarity index 99% rename from llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java rename to standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java index 63178cc06c4f..34b78729f948 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java @@ -59,3 +59,6 @@ public interface ServiceInstanceSet { int size(); } + + + diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceStateChangeListener.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/registry/ServiceInstanceStateChangeListener.java similarity index 99% rename from llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceStateChangeListener.java rename to standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/registry/ServiceInstanceStateChangeListener.java index cc1ba337523e..83ab182eae0a 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceStateChangeListener.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/registry/ServiceInstanceStateChangeListener.java @@ -40,3 +40,6 @@ public interface ServiceInstanceStateChangeListener { + + /** + * Start the service registry + */ + void start() throws IOException; + + /** + * Stop the service registry + */ + void stop() throws IOException; + + /** + * Register the current instance - the implementation takes care of the endpoints to register. + * @return self identifying name + */ + String register() throws IOException; + + /** + * Remove the current registration cleanly (implementation defined cleanup) + */ + void unregister() throws IOException; + + /** + * Update the current registration with the given attributes. + */ + void updateRegistration(Iterable> attributes) throws IOException; + + /** + * Client API to get the list of instances registered via the current registry key. + * @param component + * @param clusterReadyTimeoutMs The time to wait for the cluster to be ready, if it's not + * started yet. 0 means do not wait. + */ + ServiceInstanceSet getInstances(String component, long clusterReadyTimeoutMs) throws + IOException; + + /** + * Adds state change listeners for service instances. + * @param listener - state change listener + */ + void registerStateChangeListener(ServiceInstanceStateChangeListener listener) throws IOException; + + /** + * @return The application ID of the LLAP cluster. + */ + ApplicationId getApplicationId() throws IOException; +} + + + diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java similarity index 88% rename from llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java rename to standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java index 19c37691c67b..559905773315 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java @@ -18,6 +18,7 @@ import java.util.Objects; import org.apache.hadoop.hive.registry.ServiceInstance; +import org.apache.hadoop.hive.metastore.registry.impl.ZkRegistryBase; import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; import org.apache.hadoop.registry.client.types.AddressTypes; import org.apache.hadoop.registry.client.types.Endpoint; @@ -40,10 +41,12 @@ public ServiceInstanceBase() { public ServiceInstanceBase(ServiceRecord srv, String rpcName) throws IOException { LOG.trace("Working with ServiceRecord: {}", srv); final Endpoint rpc = srv.getInternalEndpoint(rpcName); - this.host = RegistryTypeUtils.getAddressField(rpc.addresses.get(0), - AddressTypes.ADDRESS_HOSTNAME_FIELD); - this.rpcPort = Integer.parseInt(RegistryTypeUtils.getAddressField(rpc.addresses.get(0), - AddressTypes.ADDRESS_PORT_FIELD)); + if (rpc != null && !rpc.addresses.isEmpty()) { + this.host = RegistryTypeUtils.getAddressField(rpc.addresses.get(0), + AddressTypes.ADDRESS_HOSTNAME_FIELD); + this.rpcPort = Integer.parseInt(RegistryTypeUtils.getAddressField(rpc.addresses.get(0), + AddressTypes.ADDRESS_PORT_FIELD)); + } this.workerIdentity = srv.get(ZkRegistryBase.UNIQUE_IDENTIFIER); this.properties = srv.attributes(); } @@ -109,4 +112,5 @@ public String toString() { return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host=" + host + ":" + rpcPort + "]"; } -} \ No newline at end of file +} + diff --git a/standalone-metastore/metastore-rest-catalog/pom.xml b/standalone-metastore/metastore-rest-catalog/pom.xml index 896723d22768..5360ccd76902 100644 --- a/standalone-metastore/metastore-rest-catalog/pom.xml +++ b/standalone-metastore/metastore-rest-catalog/pom.xml @@ -42,6 +42,26 @@ hive-iceberg-catalog ${hive.version} + + org.apache.hadoop + hadoop-yarn-registry + ${hadoop.version} + + + org.apache.curator + curator-framework + ${curator.version} + + + org.apache.curator + curator-recipes + ${curator.version} + + + org.apache.zookeeper + zookeeper + ${zookeeper.version} + org.apache.hive diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java index 4b085e9d34cf..c4cdb96b56e2 100644 --- a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java @@ -81,6 +81,9 @@ private Catalog createCatalog() { final String configExtWarehouse = MetastoreConf.getVar(configuration, MetastoreConf.ConfVars.WAREHOUSE_EXTERNAL); if (configExtWarehouse != null) { properties.put("external-warehouse", configExtWarehouse); + // HiveCatalog reads hive.metastore.warehouse.external.dir directly from Configuration, + // so we need to set it explicitly + configuration.set("hive.metastore.warehouse.external.dir", configExtWarehouse); } if (configuration.get(SERVLET_ID_KEY) != null) { // For the testing purpose. HiveCatalog caches a metastore client in a static field. As our tests can spin up diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/discovery/RESTCatalogEndpointDiscovery.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/discovery/RESTCatalogEndpointDiscovery.java new file mode 100644 index 000000000000..721c59a9eb5e --- /dev/null +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/discovery/RESTCatalogEndpointDiscovery.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.rest.discovery; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.iceberg.rest.ha.RESTCatalogHARegistry; +import org.apache.iceberg.rest.ha.RESTCatalogHARegistryHelper; +import org.apache.iceberg.rest.ha.RESTCatalogInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * REST Catalog Endpoint Discovery Helper. + * Discovers REST Catalog server endpoints via ZooKeeper for HA setups. + * Similar to how JDBC clients discover HiveServer2 instances. + * + * This is NOT an HMS client - it's a discovery utility that returns endpoint URLs. + * Use the discovered endpoint with HiveRESTCatalogClient or Iceberg RESTCatalog. + * + * Usage: + *
+ *   Configuration conf = new Configuration();
+ *   conf.set("hive.metastore.rest.catalog.ha.enabled", "true");
+ *   conf.set("hive.zookeeper.quorum", "zk1:2181,zk2:2181");
+ *   
+ *   RESTCatalogEndpointDiscovery discovery = new RESTCatalogEndpointDiscovery(conf);
+ *   String endpoint = discovery.getEndpoint();
+ *   // Use endpoint to connect to REST Catalog via HiveRESTCatalogClient or RESTCatalog
+ * 
+ */ +public class RESTCatalogEndpointDiscovery { + private static final Logger LOG = LoggerFactory.getLogger(RESTCatalogEndpointDiscovery.class); + + private RESTCatalogHARegistry registry; + private Configuration conf; + private boolean haMode; + + public RESTCatalogEndpointDiscovery(Configuration conf) throws IOException { + this.conf = conf; + this.haMode = MetastoreConf.getBoolVar(conf, ConfVars.REST_CATALOG_HA_ENABLED); + + if (haMode) { + this.registry = RESTCatalogHARegistryHelper.getRegistry(conf); + } + } + + /** + * Get the REST Catalog endpoint URL. + * In HA mode: returns leader's endpoint (active-passive) or random instance (active-active) + * In non-HA mode: returns configured endpoint or discovers from HMS + * + * @return REST Catalog endpoint URL + * @throws IOException if discovery fails + */ + public String getEndpoint() throws IOException { + if (!haMode) { + // Non-HA mode: use direct configuration + String uri = MetastoreConf.getVar(conf, ConfVars.REST_CATALOG_INSTANCE_URI); + if (uri != null && !uri.isEmpty()) { + return "http://" + uri + "/iceberg"; + } + // Fallback to HMS URIs discovery + return discoverFromHMS(); + } + + // HA mode: discover from ZooKeeper + Collection instances = registry.getAll(); + + if (instances.isEmpty()) { + throw new IOException("No REST Catalog instances found in ZooKeeper"); + } + + String haModeStr = MetastoreConf.getVar(conf, ConfVars.REST_CATALOG_HA_MODE); + boolean isActivePassive = "active-passive".equalsIgnoreCase(haModeStr); + + if (isActivePassive) { + // Active-Passive: return leader + RESTCatalogInstance leader = registry.getLeader(); + if (leader != null) { + LOG.info("Connecting to REST Catalog leader: {}", leader.getRestEndpoint()); + return leader.getRestEndpoint(); + } else { + throw new IOException("No REST Catalog leader found"); + } + } else { + // Active-Active: return random instance + List activeInstances = instances.stream() + .filter(RESTCatalogInstance::isActive) + .collect(Collectors.toList()); + + if (activeInstances.isEmpty()) { + // Fallback to all instances if none marked as active + activeInstances = instances.stream().collect(Collectors.toList()); + } + + if (activeInstances.isEmpty()) { + throw new IOException("No active REST Catalog instances found"); + } + + RESTCatalogInstance instance = activeInstances.get( + new Random().nextInt(activeInstances.size())); + LOG.info("Connecting to REST Catalog instance: {}", instance.getRestEndpoint()); + return instance.getRestEndpoint(); + } + } + + /** + * Get all available REST Catalog instances. + * @return collection of instances + * @throws IOException if discovery fails + */ + public Collection getAllInstances() throws IOException { + if (!haMode) { + throw new IOException("HA mode is not enabled"); + } + return registry.getAll(); + } + + /** + * Get the leader instance (active-passive mode only). + * @return leader instance or null if not found + * @throws IOException if discovery fails + */ + public RESTCatalogInstance getLeader() throws IOException { + if (!haMode) { + throw new IOException("HA mode is not enabled"); + } + return registry.getLeader(); + } + + private String discoverFromHMS() { + // Similar to how HMS clients discover HMS instances + String hmsUris = MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS); + if (hmsUris == null || hmsUris.isEmpty()) { + throw new RuntimeException("Neither REST Catalog HA nor HMS URIs are configured"); + } + // Parse and return REST Catalog endpoint + // This assumes REST Catalog runs on same hosts as HMS + String[] uris = hmsUris.split(","); + String firstUri = uris[0].trim(); + // Convert thrift://host:9083 to http://host:8080/iceberg + String hostPort = firstUri.replace("thrift://", ""); + String[] parts = hostPort.split(":"); + String host = parts[0]; + // Default REST Catalog port + int restPort = MetastoreConf.getIntVar(conf, ConfVars.CATALOG_SERVLET_PORT); + if (restPort < 0) { + restPort = 8080; + } + return "http://" + host + ":" + restPort + "/iceberg"; + } + + /** + * Close the discovery helper and release resources. + * @throws IOException if close fails + */ + public void close() throws IOException { + if (registry != null) { + registry.stop(); + } + } +} + diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/ha/RESTCatalogHAInstanceSet.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/ha/RESTCatalogHAInstanceSet.java new file mode 100644 index 000000000000..9d418a7a489d --- /dev/null +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/ha/RESTCatalogHAInstanceSet.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.rest.ha; + +import java.util.Collection; +import java.util.Set; + +/** + * Interface for REST Catalog HA instance set. + * Similar to HiveServer2HAInstanceSet. + */ +public interface RESTCatalogHAInstanceSet { + /** + * In Active/Passive setup, returns current active leader. + * @return leader instance + */ + RESTCatalogInstance getLeader(); + + /** + * Get all REST Catalog instances. + * @return collection of instances + */ + Collection getAll(); + + /** + * Get instance by instance ID. + * @param instanceId instance ID + * @return instance or null if not found + */ + RESTCatalogInstance getInstance(String instanceId); + + /** + * Get instances by host. + * @param host hostname + * @return set of instances on that host + */ + Set getByHost(String host); + + /** + * Get number of instances. + * @return size + */ + int size(); +} + diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/ha/RESTCatalogHARegistry.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/ha/RESTCatalogHARegistry.java new file mode 100644 index 000000000000..1310b02122ca --- /dev/null +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/ha/RESTCatalogHARegistry.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.rest.ha; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutorService; + +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.curator.framework.recipes.leader.LeaderLatchListener; +import org.apache.curator.utils.CloseableUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.IPStackUtils; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; +import org.apache.hadoop.hive.registry.ServiceRegistry; +import org.apache.hadoop.hive.metastore.registry.impl.ZkRegistryBase; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * REST Catalog High Availability Registry. + * Similar to HS2ActivePassiveHARegistry, provides ZooKeeper-based service discovery + * and leader election for REST Catalog instances. + */ +public class RESTCatalogHARegistry extends ZkRegistryBase implements + ServiceRegistry, RESTCatalogHAInstanceSet { + + private static final Logger LOG = LoggerFactory.getLogger(RESTCatalogHARegistry.class); + static final String ACTIVE_ENDPOINT = "activeEndpoint"; + static final String PASSIVE_ENDPOINT = "passiveEndpoint"; + private static final String SASL_LOGIN_CONTEXT_NAME = "RESTCatalogHAZooKeeperClient"; + private static final String INSTANCE_PREFIX = "instance-"; + private static final String INSTANCE_GROUP = "instances"; + private static final String LEADER_LATCH_PATH = "/_LEADER"; + + private LeaderLatch leaderLatch; + private Map registeredListeners = new HashMap<>(); + private String latchPath; + private ServiceRecord srv; + private boolean isClient; + private final String uniqueId; + + // There are 2 paths under which the instances get registered + // 1) Standard path used by ZkRegistryBase where all instances register themselves + // Secure: /restCatalogHA-sasl/instances/instance-0000000000 + // Unsecure: /restCatalogHA-unsecure/instances/instance-0000000000 + // 2) Leader latch path used for REST Catalog HA Active/Passive configuration + // Secure: /restCatalogHA-sasl/_LEADER/xxxx-latch-0000000000 + // Unsecure: /restCatalogHA-unsecure/_LEADER/xxxx-latch-0000000000 + + /** + * Factory method to create REST Catalog HA Registry. + * @param conf Configuration + * @param isClient true if this is a client-side registry, false if server-side + * @return RESTCatalogHARegistry instance + */ + public static RESTCatalogHARegistry create(Configuration conf, boolean isClient) { + String zkNameSpace = MetastoreConf.getVar(conf, ConfVars.REST_CATALOG_HA_REGISTRY_NAMESPACE); + Preconditions.checkArgument(!StringUtils.isBlank(zkNameSpace), + ConfVars.REST_CATALOG_HA_REGISTRY_NAMESPACE.getVarname() + " cannot be null or empty"); + String zkNameSpacePrefix = zkNameSpace + "-"; + return new RESTCatalogHARegistry(null, zkNameSpacePrefix, LEADER_LATCH_PATH, + null, null, isClient ? SASL_LOGIN_CONTEXT_NAME : null, conf, isClient); + } + + private RESTCatalogHARegistry(final String instanceName, final String zkNamespacePrefix, + final String leaderLatchPath, final String krbPrincipal, final String krbKeytab, + final String saslContextName, final Configuration conf, final boolean isClient) { + super(instanceName, conf, null, zkNamespacePrefix, null, INSTANCE_PREFIX, INSTANCE_GROUP, + saslContextName, krbPrincipal, krbKeytab, null); + this.isClient = isClient; + if (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) && + conf.get(ZkRegistryBase.UNIQUE_IDENTIFIER) != null) { + this.uniqueId = conf.get(ZkRegistryBase.UNIQUE_IDENTIFIER); + } else { + this.uniqueId = UUID.randomUUID().toString(); + } + this.latchPath = leaderLatchPath; + this.leaderLatch = getNewLeaderLatchPath(); + } + + @Override + public void start() throws IOException { + super.start(); + if (!isClient) { + this.srv = getNewServiceRecord(); + register(); + registerLeaderLatchListener(new RESTCatalogLeaderLatchListener(), null); + try { + // All participating instances use the same latch path, and curator randomly chooses one instance to be leader + leaderLatch.start(); + } catch (Exception e) { + throw new IOException(e); + } + LOG.info("Registered REST Catalog with ZK. service record: {}", srv); + } else { + populateCache(); + LOG.info("Populating instances cache for client"); + } + } + + @Override + protected void unregisterInternal() { + super.unregisterInternal(); + } + + @Override + public String register() throws IOException { + updateEndpoint(srv, PASSIVE_ENDPOINT); + return registerServiceRecord(srv, uniqueId); + } + + @Override + public void unregister() { + if (leaderLatch != null) { + try { + leaderLatch.close(); + } catch (IllegalStateException e) { + // Already closed, ignore + LOG.debug("LeaderLatch already closed during unregister", e); + } catch (Exception e) { + LOG.warn("Error closing LeaderLatch during unregister", e); + } + leaderLatch = null; + } + unregisterInternal(); + } + + @Override + public void updateRegistration(Iterable> attributes) throws IOException { + throw new UnsupportedOperationException(); + } + + private void populateCache() throws IOException { + PathChildrenCache pcc = ensureInstancesCache(0); + populateCache(pcc, false); + } + + @Override + public ServiceInstanceSet getInstances(final String component, + final long clusterReadyTimeoutMs) throws IOException { + throw new IOException("Not supported to get instances by component name"); + } + + private void addActiveEndpointToServiceRecord() throws IOException { + addEndpointToServiceRecord(getNewServiceRecord(), ACTIVE_ENDPOINT); + } + + private void addPassiveEndpointToServiceRecord() throws IOException { + addEndpointToServiceRecord(getNewServiceRecord(), PASSIVE_ENDPOINT); + } + + private void addEndpointToServiceRecord(final ServiceRecord srv, final String endpointName) + throws IOException { + updateEndpoint(srv, endpointName); + updateServiceRecord(srv, doCheckAcls, true); + } + + private void updateEndpoint(final ServiceRecord srv, final String endpointName) { + String instanceUri = srv.get(ConfVars.REST_CATALOG_INSTANCE_URI.getVarname()); + IPStackUtils.HostPort hostPort; + if (instanceUri != null && !instanceUri.isEmpty()) { + hostPort = IPStackUtils.getHostAndPort(instanceUri); + } else { + // Fallback: use hostname and port from configuration + String hostname = ZkRegistryBase.hostname; + int port = MetastoreConf.getIntVar(conf, ConfVars.CATALOG_SERVLET_PORT); + if (port < 0) { + port = 8080; // Default port + } + hostPort = new IPStackUtils.HostPort(hostname, port); + } + final String endpointHostname = hostPort.getHostname(); + final int endpointPort = hostPort.getPort(); + Endpoint urlEndpoint = RegistryTypeUtils.ipcEndpoint(endpointName, + new InetSocketAddress(endpointHostname, endpointPort)); + srv.addInternalEndpoint(urlEndpoint); + LOG.info("Added {} endpoint to service record: {}", endpointName, urlEndpoint); + } + + @Override + public void stop() { + if (leaderLatch != null) { + try { + leaderLatch.close(); + } catch (IllegalStateException e) { + // Already closed, ignore + LOG.debug("LeaderLatch already closed during stop", e); + } catch (Exception e) { + LOG.warn("Error closing LeaderLatch during stop", e); + } + leaderLatch = null; + } + super.stop(); + } + + @Override + protected RESTCatalogInstance createServiceInstance(final ServiceRecord srv) throws IOException { + Endpoint activeEndpoint = srv.getInternalEndpoint(ACTIVE_ENDPOINT); + return new RESTCatalogInstance(srv, activeEndpoint != null ? ACTIVE_ENDPOINT : PASSIVE_ENDPOINT); + } + + @Override + public synchronized void registerStateChangeListener( + final ServiceInstanceStateChangeListener listener) throws IOException { + super.registerStateChangeListener(listener); + } + + @Override + protected String getZkPathUser(final Configuration conf) { + return currentUser(); + } + + @Override + public org.apache.hadoop.yarn.api.records.ApplicationId getApplicationId() throws IOException { + throw new IOException("Not supported until REST Catalog runs as YARN application"); + } + + public boolean hasLeadership() { + return leaderLatch != null && leaderLatch.hasLeadership(); + } + + public String getUniqueId() { + return uniqueId; + } + + /** + * Returns a new instance of leader latch path but retains the same uniqueId. + * @return new leader latch + */ + private LeaderLatch getNewLeaderLatchPath() { + return new LeaderLatch(zooKeeperClient, latchPath, uniqueId, + LeaderLatch.CloseMode.NOTIFY_LEADER); + } + + private class RESTCatalogLeaderLatchListener implements LeaderLatchListener { + // Leadership state changes happen inside synchronized methods in curator. + // Do only lightweight actions in main-event handler thread. + @Override + public void isLeader() { + // Only leader publishes instance uri as endpoint which will be used by clients + try { + if (!hasLeadership()) { + LOG.info("isLeader notification received but hasLeadership returned false.. awaiting.."); + leaderLatch.await(); + } + addActiveEndpointToServiceRecord(); + LOG.info("REST Catalog instance in ACTIVE mode. Service record: {}", srv); + } catch (Exception e) { + throw new RuntimeException("Unable to add active endpoint to service record", e); + } + } + + @Override + public void notLeader() { + try { + if (hasLeadership()) { + LOG.info("notLeader notification received but hasLeadership returned true.. awaiting.."); + leaderLatch.await(); + } + addPassiveEndpointToServiceRecord(); + LOG.info("REST Catalog instance lost leadership. Switched to PASSIVE standby mode."); + } catch (Exception e) { + throw new RuntimeException("Unable to add passive endpoint to service record", e); + } + } + } + + @Override + public RESTCatalogInstance getLeader() { + for (RESTCatalogInstance instance : getAll()) { + if (instance.isLeader()) { + return instance; + } + } + return null; + } + + @Override + public Collection getAll() { + return getAllInternal(); + } + + @Override + public RESTCatalogInstance getInstance(final String instanceId) { + for (RESTCatalogInstance instance : getAll()) { + if (instance.getWorkerIdentity().equals(instanceId)) { + return instance; + } + } + return null; + } + + @Override + public Set getByHost(final String host) { + return getByHostInternal(host); + } + + @Override + public int size() { + return sizeInternal(); + } + + /** + * Register leader latch listener for leadership change notifications. + * @param latchListener listener + * @param executorService event handler executor service + */ + public void registerLeaderLatchListener(final LeaderLatchListener latchListener, + final ExecutorService executorService) { + registeredListeners.put(latchListener, executorService); + if (executorService == null) { + leaderLatch.addListener(latchListener); + } else { + leaderLatch.addListener(latchListener, executorService); + } + } + + private Map getConfsToPublish() { + final Map confsToPublish = new HashMap<>(); + // Instance URI + String instanceUri = MetastoreConf.getVar(conf, ConfVars.REST_CATALOG_INSTANCE_URI); + if (instanceUri == null || instanceUri.isEmpty()) { + int port = MetastoreConf.getIntVar(conf, ConfVars.CATALOG_SERVLET_PORT); + if (port < 0) { + port = 8080; + } + instanceUri = ZkRegistryBase.hostname + ":" + port; + } + confsToPublish.put(ConfVars.REST_CATALOG_INSTANCE_URI.getVarname(), instanceUri); + confsToPublish.put(ZkRegistryBase.UNIQUE_IDENTIFIER, uniqueId); + // REST Catalog path + confsToPublish.put(ConfVars.ICEBERG_CATALOG_SERVLET_PATH.getVarname(), + MetastoreConf.getVar(conf, ConfVars.ICEBERG_CATALOG_SERVLET_PATH)); + // Port + confsToPublish.put(ConfVars.CATALOG_SERVLET_PORT.getVarname(), + String.valueOf(MetastoreConf.getIntVar(conf, ConfVars.CATALOG_SERVLET_PORT))); + return confsToPublish; + } + + private ServiceRecord getNewServiceRecord() { + ServiceRecord srv = new ServiceRecord(); + final Map confsToPublish = getConfsToPublish(); + for (Map.Entry entry : confsToPublish.entrySet()) { + srv.set(entry.getKey(), entry.getValue()); + } + return srv; + } +} + diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/ha/RESTCatalogHARegistryHelper.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/ha/RESTCatalogHARegistryHelper.java new file mode 100644 index 000000000000..d6ccda9935e6 --- /dev/null +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/ha/RESTCatalogHARegistryHelper.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.rest.ha; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.registry.impl.ZkRegistryBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * Helper class for accessing REST Catalog HA Registry. + * Provides cached access to RESTCatalogHARegistry instances for service discovery. + * + * This is NOT a REST Catalog client - it's a helper utility for accessing the registry. + * Similar to HS2ActivePassiveHARegistryClient. + */ +public class RESTCatalogHARegistryHelper { + private static final Logger LOG = LoggerFactory.getLogger(RESTCatalogHARegistryHelper.class); + private static final Map registries = new HashMap<>(); + + /** + * Get a REST Catalog HA Registry instance to read from the registry. + * Only used for service discovery to connect to active REST Catalog instances. + * + * @param conf Configuration instance which contains service registry information + * @return RESTCatalogHARegistry instance for reading from the registry + * @throws IOException if registry creation fails + */ + public static synchronized RESTCatalogHARegistry getRegistry(Configuration conf) throws IOException { + String namespace = MetastoreConf.getVar(conf, ConfVars.REST_CATALOG_HA_REGISTRY_NAMESPACE); + Preconditions.checkArgument(!StringUtils.isBlank(namespace), + ConfVars.REST_CATALOG_HA_REGISTRY_NAMESPACE.getVarname() + " cannot be null or empty"); + String nsKey = ZkRegistryBase.getRootNamespace(conf, null, namespace + "-"); + RESTCatalogHARegistry registry = registries.get(nsKey); + if (registry == null) { + registry = RESTCatalogHARegistry.create(conf, true); + registry.start(); + registries.put(nsKey, registry); + LOG.info("Added REST Catalog registry to cache with namespace: {}", nsKey); + } else { + LOG.info("Returning cached REST Catalog registry for namespace: {}", nsKey); + } + return registry; + } +} + diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/ha/RESTCatalogInstance.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/ha/RESTCatalogInstance.java new file mode 100644 index 000000000000..d98479778ce0 --- /dev/null +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/ha/RESTCatalogInstance.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.rest.ha; + +import java.io.IOException; +import java.util.Objects; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.registry.impl.ServiceInstanceBase; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import com.google.common.base.Preconditions; + +/** + * REST Catalog instance representation for HA service discovery. + * Similar to HiveServer2Instance. + */ +public class RESTCatalogInstance extends ServiceInstanceBase { + private boolean isLeader; + private String restEndpoint; + + // Empty constructor for Jackson + public RESTCatalogInstance() { + } + + public RESTCatalogInstance(final ServiceRecord srv, final String endPointName) throws IOException { + super(srv, endPointName); + + Endpoint activeEndpoint = srv.getInternalEndpoint(RESTCatalogHARegistry.ACTIVE_ENDPOINT); + Endpoint passiveEndpoint = srv.getInternalEndpoint(RESTCatalogHARegistry.PASSIVE_ENDPOINT); + this.isLeader = activeEndpoint != null; + Preconditions.checkArgument(activeEndpoint == null || passiveEndpoint == null, + "Incorrect service record. Both active and passive endpoints cannot be non-null!"); + + String instanceUri = srv.get(MetastoreConf.ConfVars.REST_CATALOG_INSTANCE_URI.getVarname()); + if (instanceUri != null && !instanceUri.isEmpty()) { + this.restEndpoint = "http://" + instanceUri + "/iceberg"; + } else { + // Fallback: construct from host and port + this.restEndpoint = "http://" + getHost() + ":" + getRpcPort() + "/iceberg"; + } + } + + public boolean isLeader() { + return isLeader; + } + + public String getRestEndpoint() { + return restEndpoint; + } + + public void setLeader(final boolean leader) { + isLeader = leader; + } + + public void setRestEndpoint(final String restEndpoint) { + this.restEndpoint = restEndpoint; + } + + public boolean isActive() { + return isLeader; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + RESTCatalogInstance other = (RESTCatalogInstance) o; + return super.equals(o) && isLeader == other.isLeader + && Objects.equals(restEndpoint, other.restEndpoint); + } + + @Override + public int hashCode() { + return super.hashCode() + Objects.hashCode(isLeader) + Objects.hashCode(restEndpoint); + } + + @Override + public String toString() { + return "instanceId: " + getWorkerIdentity() + " isLeader: " + isLeader + + " host: " + getHost() + " port: " + getRpcPort() + + " restEndpoint: " + restEndpoint; + } +} diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/standalone/StandaloneRESTCatalogServer.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/standalone/StandaloneRESTCatalogServer.java new file mode 100644 index 000000000000..cba257ad59a8 --- /dev/null +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/standalone/StandaloneRESTCatalogServer.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.rest.standalone; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.curator.framework.recipes.leader.LeaderLatchListener; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.ServletServerBuilder; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.iceberg.rest.HMSCatalogFactory; +import org.apache.iceberg.rest.ha.RESTCatalogHARegistry; +import org.eclipse.jetty.server.Server; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Standalone REST Catalog Server with High Availability support. + * Can be deployed independently from HMS and scaled independently. + */ +public class StandaloneRESTCatalogServer { + private static final Logger LOG = LoggerFactory.getLogger(StandaloneRESTCatalogServer.class); + + private Server server; + private Configuration conf; + private RESTCatalogHARegistry haRegistry; + private AtomicBoolean isLeader = new AtomicBoolean(false); + private ExecutorService leaderActionsExecutorService; + private LeaderLatchListener leaderLatchListener; + + public StandaloneRESTCatalogServer(Configuration conf) { + this.conf = conf; + } + + /** + * Start the REST Catalog server. + * @throws Exception if startup fails + */ + public void start() throws Exception { + // Check if HA is enabled + boolean haEnabled = MetastoreConf.getBoolVar(conf, ConfVars.REST_CATALOG_HA_ENABLED); + + if (haEnabled) { + LOG.info("Starting REST Catalog server with High Availability"); + startWithHA(); + } else { + LOG.info("Starting REST Catalog server without HA"); + startWithoutHA(); + } + } + + private void startWithHA() throws Exception { + // Initialize HA Registry + haRegistry = RESTCatalogHARegistry.create(conf, false); + + // Create leader latch listener + leaderLatchListener = new RESTCatalogLeaderLatchListener(this); + leaderActionsExecutorService = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("REST-Catalog-Leader-Actions-%d") + .build()); + + // Register leader latch listener + haRegistry.registerLeaderLatchListener(leaderLatchListener, leaderActionsExecutorService); + + // Start HA registry (registers in ZooKeeper) + haRegistry.start(); + + // Start the servlet server + int restPort = MetastoreConf.getIntVar(conf, ConfVars.CATALOG_SERVLET_PORT); + if (restPort < 0) { + restPort = 8080; // Default port + } + + ServletServerBuilder builder = new ServletServerBuilder(conf); + ServletServerBuilder.Descriptor restCatalogDescriptor = + HMSCatalogFactory.createServlet(conf); + + if (restCatalogDescriptor == null) { + throw new RuntimeException("Failed to create REST Catalog servlet"); + } + + builder.addServlet(restCatalogDescriptor); + server = builder.start(LOG); + + if (server == null || !server.isStarted()) { + throw new RuntimeException("Failed to start REST Catalog server"); + } + + LOG.info("REST Catalog server started (waiting for leadership) on port {}", restPort); + } + + private void startWithoutHA() throws Exception { + int restPort = MetastoreConf.getIntVar(conf, ConfVars.CATALOG_SERVLET_PORT); + if (restPort < 0) { + restPort = 8080; + } + + String hmsUris = MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS); + if (hmsUris == null || hmsUris.isEmpty()) { + throw new IllegalArgumentException("hive.metastore.uris must be configured"); + } + + LOG.info("Starting standalone REST Catalog server on port {}", restPort); + LOG.info("Connecting to HMS at: {}", hmsUris); + + ServletServerBuilder builder = new ServletServerBuilder(conf); + ServletServerBuilder.Descriptor restCatalogDescriptor = + HMSCatalogFactory.createServlet(conf); + + if (restCatalogDescriptor == null) { + throw new RuntimeException("Failed to create REST Catalog servlet"); + } + + builder.addServlet(restCatalogDescriptor); + server = builder.start(LOG); + + if (server == null || !server.isStarted()) { + throw new RuntimeException("Failed to start REST Catalog server"); + } + + LOG.info("REST Catalog server started successfully on port {}", restPort); + } + + private class RESTCatalogLeaderLatchListener implements LeaderLatchListener { + private final StandaloneRESTCatalogServer server; + + RESTCatalogLeaderLatchListener(StandaloneRESTCatalogServer server) { + this.server = server; + } + + @Override + public void isLeader() { + LOG.info("REST Catalog instance {} became the LEADER", + haRegistry.getUniqueId()); + server.isLeader.set(true); + // Server is now active and accepting connections + } + + @Override + public void notLeader() { + LOG.info("REST Catalog instance {} LOST LEADERSHIP", + haRegistry.getUniqueId()); + server.isLeader.set(false); + // Optionally: stop accepting new connections, drain existing ones + // For now, we'll continue serving but mark as passive + } + } + + /** + * Stop the REST Catalog server. + * @throws Exception if shutdown fails + */ + public void stop() throws Exception { + if (haRegistry != null) { + haRegistry.unregister(); + haRegistry.stop(); + } + if (leaderActionsExecutorService != null) { + leaderActionsExecutorService.shutdown(); + } + if (server != null) { + server.stop(); + LOG.info("REST Catalog server stopped"); + } + } + + /** + * Check if this instance is the leader. + * @return true if leader, false otherwise + */ + public boolean isLeader() { + return isLeader.get(); + } + + /** + * Get the server instance. + * @return Jetty server instance + */ + public Server getServer() { + return server; + } + + /** + * Main method for standalone deployment. + * @param args command line arguments + */ + public static void main(String[] args) throws Exception { + Configuration conf = new Configuration(); + + // Load configuration from files + conf.addResource("hive-site.xml"); + conf.addResource("core-site.xml"); + + // Override with system properties + for (String key : System.getProperties().stringPropertyNames()) { + if (key.startsWith("hive.") || key.startsWith("metastore.")) { + conf.set(key, System.getProperty(key)); + } + } + + StandaloneRESTCatalogServer server = new StandaloneRESTCatalogServer(conf); + + // Add shutdown hook + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + server.stop(); + } catch (Exception e) { + LOG.error("Error stopping server", e); + } + })); + + server.start(); + + // Keep running + if (server.getServer() != null) { + server.getServer().join(); + } + } +} +