diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java index cb852a7e84a..27639bae4e5 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java @@ -18,6 +18,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.URI; +import java.util.Arrays; +import java.util.List; import java.util.Properties; import javax.ws.rs.core.Configurable; @@ -32,6 +35,7 @@ import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException; import io.confluent.kafka.schemaregistry.storage.serialization.SchemaRegistrySerializer; import io.confluent.rest.Application; +import io.confluent.rest.RestConfig; import io.confluent.rest.RestConfigException; public class SchemaRegistryRestApplication extends Application { @@ -50,9 +54,13 @@ public SchemaRegistryRestApplication(SchemaRegistryConfig config) { @Override public void setupResources(Configurable config, SchemaRegistryConfig schemaRegistryConfig) { try { - schemaRegistry = new KafkaSchemaRegistry(schemaRegistryConfig, + schemaRegistry = new KafkaSchemaRegistry(this, new SchemaRegistrySerializer()); - schemaRegistry.init(); + // This is a bit of a hack to deal with startup ordering issues when port == 0. See note in + // onStarted. + if (listeners().get(0).getPort() > 0) { + schemaRegistry.init(); + } } catch (SchemaRegistryException e) { log.error("Error starting the schema registry", e); System.exit(1); @@ -65,6 +73,27 @@ public void setupResources(Configurable config, SchemaRegistryConfig schemaRe config.register(new CompatibilityResource(schemaRegistry)); } + @Override + public void onStarted() { + // We override this only for the case of unit/integration tests that use port == 0 to avoid + // issues with requiring specific ports/parallel tests/ability to determine truly free ports. + // We have to reorder startup a bit in this case because schema registry identity information + // includes the port, but we cannot know this until Jetty completes startup. For the relevant + // tests, it is safe to reorder since the relevant steps are blocking, so by the time we finally + // return from the Jetty's Server.start() call, everything will be initialized. In real + // deployments the port would always be fixed and we would want to run this initialization + // before Server.start() (in setupResources) to ensure we have a connection and can handle + // requests when we start Jetty listening. + if (listeners().get(0).getPort() == 0) { + try { + schemaRegistry.init(); + } catch (SchemaRegistryException e) { + log.error("Error starting the schema registry", e); + System.exit(1); + } + } + } + @Override public void onShutdown() { schemaRegistry.close(); @@ -74,4 +103,13 @@ public void onShutdown() { public KafkaSchemaRegistry schemaRegistry() { return schemaRegistry; } + + private List listeners() { + return parseListeners( + config.getList(RestConfig.LISTENERS_CONFIG), + config.getInt(RestConfig.PORT_CONFIG), + Arrays.asList("http", "https"), + "http" + ); + } } diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java index 45918a6c856..249c28f2a9f 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java @@ -87,19 +87,18 @@ public class KafkaSchemaRegistry implements SchemaRegistry { public static final String ZOOKEEPER_SCHEMA_ID_COUNTER = "/schema_id_counter"; private static final int ZOOKEEPER_SCHEMA_ID_COUNTER_BATCH_WRITE_RETRY_BACKOFF_MS = 50; private static final Logger log = LoggerFactory.getLogger(KafkaSchemaRegistry.class); + + private final Application app; final Map guidToSchemaKey; final Map schemaHashToGuid; - private final KafkaStore kafkaStore; private final Serializer serializer; - private final SchemaRegistryIdentity myIdentity; private final Object masterLock = new Object(); private final AvroCompatibilityLevel defaultCompatibilityLevel; private final String schemaRegistryZkNamespace; - private final String kafkaClusterZkUrl; - private final int zkSessionTimeoutMs; private final int kafkaStoreTimeoutMs; private final boolean isEligibleForMasterElector; - private String schemaRegistryZkUrl; + private KafkaStore kafkaStore; + private SchemaRegistryIdentity myIdentity; private ZkUtils zkUtils; private SchemaRegistryIdentity masterIdentity; private RestService masterRestService; @@ -119,28 +118,39 @@ public class KafkaSchemaRegistry implements SchemaRegistry { // data is already in the kafkastore. private int maxIdInKafkaStore = -1; - public KafkaSchemaRegistry(SchemaRegistryConfig config, + public KafkaSchemaRegistry(Application app, Serializer serializer) throws SchemaRegistryException { - String host = config.getString(SchemaRegistryConfig.HOST_NAME_CONFIG); - int port = getPortForIdentity(config.getInt(SchemaRegistryConfig.PORT_CONFIG), - config.getList(RestConfig.LISTENERS_CONFIG)); + this.app = app; + SchemaRegistryConfig config = app.getConfiguration(); this.schemaRegistryZkNamespace = config.getString(SchemaRegistryConfig.SCHEMAREGISTRY_ZK_NAMESPACE); this.isEligibleForMasterElector = config.getBoolean(SchemaRegistryConfig.MASTER_ELIGIBILITY); - this.myIdentity = new SchemaRegistryIdentity(host, port, isEligibleForMasterElector); - this.kafkaClusterZkUrl = - config.getString(SchemaRegistryConfig.KAFKASTORE_CONNECTION_URL_CONFIG); - this.zkSessionTimeoutMs = - config.getInt(SchemaRegistryConfig.KAFKASTORE_ZK_SESSION_TIMEOUT_MS_CONFIG); - this.kafkaStoreTimeoutMs = + this.kafkaStoreTimeoutMs = config.getInt(SchemaRegistryConfig.KAFKASTORE_TIMEOUT_CONFIG); this.serializer = serializer; this.defaultCompatibilityLevel = config.compatibilityType(); this.guidToSchemaKey = new HashMap(); this.schemaHashToGuid = new HashMap(); + } + + @Override + public void init() throws SchemaRegistryInitializationException { + // A Schema Registry instance's identity is in part the port it listens on. Current listeners + // are possible, so the first is used for the identity (and the connection to be used to + // forward requests if this node is the master). In theory, any port from any listener would + // be sufficient. Choosing the first, instead of say the last, + // is arbitrary. + // + // To support tests that dynamically allocate ports (i.e. set value to 0), we are careful to + // use the actually listening port rather than the PORT or LISTENER settings directly. + SchemaRegistryConfig config = app.getConfiguration(); + String host = config.getString(SchemaRegistryConfig.HOST_NAME_CONFIG); + int listeningPort = app.localPorts().get(0); + this.myIdentity = new SchemaRegistryIdentity(host, listeningPort, isEligibleForMasterElector); kafkaStore = new KafkaStore( config, + listeningPort, new KafkaStoreMessageHandler(this), this.serializer, new InMemoryStore(), new NoopKey()); @@ -161,28 +171,7 @@ public KafkaSchemaRegistry(SchemaRegistryConfig config, + " node where all register schema and config update requests are " + "served."); this.masterNodeSensor.add(m, new Gauge()); - } - /** - * A Schema Registry instance's identity is in part the port it listens on. Currently the - * port can either be configured via the deprecated `port` configuration, or via the `listeners` - * configuration. - * - * This method uses `Application.parseListeners()` from `rest-utils` to get a list of listeners, and - * returns the port of the first listener to be used for the instance's identity. - * - * In theory, any port from any listener would be sufficient. Choosing the first, instead of say the last, - * is arbitrary. - */ - // TODO: once RestConfig.PORT_CONFIG is deprecated, remove the port parameter. - static int getPortForIdentity(int port, List configuredListeners) { - List listeners = Application.parseListeners(configuredListeners, port, - Arrays.asList("http", "https"), "http"); - return listeners.get(0).getPort(); - } - - @Override - public void init() throws SchemaRegistryInitializationException { try { kafkaStore.init(); } catch (StoreInitializationException e) { @@ -203,13 +192,17 @@ public void init() throws SchemaRegistryInitializationException { } private void createZkNamespace() { + int zkSessionTimeoutMs = + app.getConfiguration().getInt(SchemaRegistryConfig.KAFKASTORE_ZK_SESSION_TIMEOUT_MS_CONFIG); + String kafkaClusterZkUrl = + app.getConfiguration().getString(SchemaRegistryConfig.KAFKASTORE_CONNECTION_URL_CONFIG); int kafkaNamespaceIndex = kafkaClusterZkUrl.indexOf("/"); String zkConnForNamespaceCreation = kafkaNamespaceIndex > 0 ? kafkaClusterZkUrl.substring(0, kafkaNamespaceIndex) : kafkaClusterZkUrl; String schemaRegistryNamespace = "/" + schemaRegistryZkNamespace; - schemaRegistryZkUrl = zkConnForNamespaceCreation + schemaRegistryNamespace; + String schemaRegistryZkUrl = zkConnForNamespaceCreation + schemaRegistryNamespace; ZkUtils zkUtilsForNamespaceCreation = ZkUtils.apply( zkConnForNamespaceCreation, diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStore.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStore.java index c50b6ad9f7c..bdb34ee5534 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStore.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStore.java @@ -84,6 +84,7 @@ public class KafkaStore implements Store { private final SchemaRegistryConfig config; public KafkaStore(SchemaRegistryConfig config, + int identityPort, StoreUpdateHandler storeUpdateHandler, Serializer serializer, Store localStore, @@ -93,11 +94,9 @@ public KafkaStore(SchemaRegistryConfig config, this.topic = config.getString(SchemaRegistryConfig.KAFKASTORE_TOPIC_CONFIG); this.desiredReplicationFactor = config.getInt(SchemaRegistryConfig.KAFKASTORE_TOPIC_REPLICATION_FACTOR_CONFIG); - int port = KafkaSchemaRegistry.getPortForIdentity(config.getInt(SchemaRegistryConfig.PORT_CONFIG), - config.getList(RestConfig.LISTENERS_CONFIG)); this.groupId = String.format("schema-registry-%s-%d", config.getString(SchemaRegistryConfig.HOST_NAME_CONFIG), - port); + identityPort); initTimeout = config.getInt(SchemaRegistryConfig.KAFKASTORE_INIT_TIMEOUT_CONFIG); timeout = config.getInt(SchemaRegistryConfig.KAFKASTORE_TIMEOUT_CONFIG); this.storeUpdateHandler = storeUpdateHandler; diff --git a/core/src/test/java/io/confluent/kafka/schemaregistry/ClusterTestHarness.java b/core/src/test/java/io/confluent/kafka/schemaregistry/ClusterTestHarness.java index dc277922e2b..52fd474a7a1 100644 --- a/core/src/test/java/io/confluent/kafka/schemaregistry/ClusterTestHarness.java +++ b/core/src/test/java/io/confluent/kafka/schemaregistry/ClusterTestHarness.java @@ -52,32 +52,6 @@ public abstract class ClusterTestHarness { public static final String KAFKASTORE_TOPIC = SchemaRegistryConfig.DEFAULT_KAFKASTORE_TOPIC; protected static final Option SASL_PROPERTIES = Option$.MODULE$.empty(); - /** - * Choose a number of random available ports - */ - public static int[] choosePorts(int count) { - try { - ServerSocket[] sockets = new ServerSocket[count]; - int[] ports = new int[count]; - for (int i = 0; i < count; i++) { - sockets[i] = new ServerSocket(0, 0, InetAddress.getByName("0.0.0.0")); - ports[i] = sockets[i].getLocalPort(); - } - for (int i = 0; i < count; i++) - sockets[i].close(); - return ports; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - /** - * Choose an available port - */ - public static int choosePort() { - return choosePorts(1)[0]; - } - private int numBrokers; private boolean setupRestApp; private String compatibilityType; @@ -139,7 +113,7 @@ public void setUp() throws Exception { getSecurityProtocol()); if (setupRestApp) { - restApp = new RestApp(choosePort(), zkConnect, KAFKASTORE_TOPIC, compatibilityType); + restApp = new RestApp(zkConnect, KAFKASTORE_TOPIC, compatibilityType); restApp.start(); } } diff --git a/core/src/test/java/io/confluent/kafka/schemaregistry/RestApp.java b/core/src/test/java/io/confluent/kafka/schemaregistry/RestApp.java index 62fb60d8d5c..8c04299537d 100644 --- a/core/src/test/java/io/confluent/kafka/schemaregistry/RestApp.java +++ b/core/src/test/java/io/confluent/kafka/schemaregistry/RestApp.java @@ -36,18 +36,18 @@ public class RestApp { public Server restServer; public String restConnect; - public RestApp(int port, String zkConnect, String kafkaTopic) { - this(port, zkConnect, kafkaTopic, AvroCompatibilityLevel.NONE.name); + public RestApp(String zkConnect, String kafkaTopic) { + this(zkConnect, kafkaTopic, AvroCompatibilityLevel.NONE.name); } - public RestApp(int port, String zkConnect, String kafkaTopic, String compatibilityType) { - this(port, zkConnect, kafkaTopic, compatibilityType, true); + public RestApp(String zkConnect, String kafkaTopic, String compatibilityType) { + this(zkConnect, kafkaTopic, compatibilityType, true); } - public RestApp(int port, String zkConnect, String kafkaTopic, + public RestApp(String zkConnect, String kafkaTopic, String compatibilityType, boolean masterEligibility) { prop = new Properties(); - prop.setProperty(SchemaRegistryConfig.PORT_CONFIG, ((Integer) port).toString()); + prop.setProperty(SchemaRegistryConfig.PORT_CONFIG, "0"); prop.setProperty(SchemaRegistryConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect); prop.put(SchemaRegistryConfig.KAFKASTORE_TOPIC_CONFIG, kafkaTopic); prop.put(SchemaRegistryConfig.COMPATIBILITY_CONFIG, compatibilityType); diff --git a/core/src/test/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistryTest.java b/core/src/test/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistryTest.java deleted file mode 100644 index ec44ab5bb84..00000000000 --- a/core/src/test/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistryTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Copyright 2016 Confluent Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.kafka.schemaregistry.storage; - -import org.junit.Test; - -import java.util.LinkedList; -import java.util.List; - -import static org.junit.Assert.assertEquals; - -public class KafkaSchemaRegistryTest { - @Test - public void testGetPortForIdentityPrecedence() { - List listeners = new LinkedList(); - listeners.add("http://localhost:456"); - - int port = KafkaSchemaRegistry.getPortForIdentity(123, listeners); - assertEquals("Expected listeners to take precedence over port.", 456, port); - } - - @Test - public void testGetPortForIdentityNoListeners() { - List listeners = new LinkedList(); - int port = KafkaSchemaRegistry.getPortForIdentity(123, listeners); - assertEquals("Expected port to take the configured port value", 123, port); - } - - @Test - public void testGetPortForIdentityMultipleListeners() { - List listeners = new LinkedList(); - listeners.add("http://localhost:123"); - listeners.add("https://localhost:456"); - - int port = KafkaSchemaRegistry.getPortForIdentity(-1, listeners); - assertEquals("Expected first listener's port to be returned", 123, port); - } -} diff --git a/core/src/test/java/io/confluent/kafka/schemaregistry/storage/StoreUtils.java b/core/src/test/java/io/confluent/kafka/schemaregistry/storage/StoreUtils.java index 1958dead2a8..3e301d1be9d 100644 --- a/core/src/test/java/io/confluent/kafka/schemaregistry/storage/StoreUtils.java +++ b/core/src/test/java/io/confluent/kafka/schemaregistry/storage/StoreUtils.java @@ -98,7 +98,7 @@ public static KafkaStore createAndInitKafkaStoreInstance( } KafkaStore kafkaStore = - new KafkaStore(config, new StringMessageHandler(), + new KafkaStore(config, 0, new StringMessageHandler(), StringSerializer.INSTANCE, inMemoryStore, new NoopKey().toString()); try { diff --git a/core/src/test/java/io/confluent/kafka/schemaregistry/zookeeper/MasterElectorTest.java b/core/src/test/java/io/confluent/kafka/schemaregistry/zookeeper/MasterElectorTest.java index c072277aec6..127fd6a5fdd 100644 --- a/core/src/test/java/io/confluent/kafka/schemaregistry/zookeeper/MasterElectorTest.java +++ b/core/src/test/java/io/confluent/kafka/schemaregistry/zookeeper/MasterElectorTest.java @@ -58,13 +58,11 @@ public void testAutoFailover() throws Exception { List avroSchemas = TestUtils.getRandomCanonicalAvroString(4); // create schema registry instance 1 - final RestApp restApp1 = new RestApp(choosePort(), - zkConnect, KAFKASTORE_TOPIC); + final RestApp restApp1 = new RestApp(zkConnect, KAFKASTORE_TOPIC); restApp1.start(); // create schema registry instance 2 - final RestApp restApp2 = new RestApp(choosePort(), - zkConnect, KAFKASTORE_TOPIC); + final RestApp restApp2 = new RestApp(zkConnect, KAFKASTORE_TOPIC); restApp2.start(); assertTrue("Schema registry instance 1 should be the master", restApp1.isMaster()); assertFalse("Schema registry instance 2 shouldn't be the master", restApp2.isMaster()); @@ -250,8 +248,7 @@ public void testSlaveIsNeverMaster() throws Exception { Set slaveApps = new HashSet(); RestApp aSlave = null; for (int i = 0; i < numSlaves; i++) { - RestApp slave = new RestApp(choosePort(), - zkConnect, KAFKASTORE_TOPIC, + RestApp slave = new RestApp(zkConnect, KAFKASTORE_TOPIC, AvroCompatibilityLevel.NONE.name, false); slaveApps.add(slave); slave.start(); @@ -278,8 +275,7 @@ public void testSlaveIsNeverMaster() throws Exception { // Make a master-eligible 'cluster' final Set masterApps = new HashSet(); for (int i = 0; i < numMasters; i++) { - RestApp master = new RestApp(choosePort(), - zkConnect, KAFKASTORE_TOPIC, + RestApp master = new RestApp(zkConnect, KAFKASTORE_TOPIC, AvroCompatibilityLevel.NONE.name, true); masterApps.add(master); master.start(); @@ -329,8 +325,7 @@ public void testRegistrationOnMasterSlaveClusters() throws Exception { Set slaveApps = new HashSet(); RestApp aSlave = null; for (int i = 0; i < numSlaves; i++) { - RestApp slave = new RestApp(choosePort(), - zkConnect, KAFKASTORE_TOPIC, + RestApp slave = new RestApp(zkConnect, KAFKASTORE_TOPIC, AvroCompatibilityLevel.NONE.name, false); slaveApps.add(slave); slave.start(); @@ -354,8 +349,7 @@ public void testRegistrationOnMasterSlaveClusters() throws Exception { final Set masterApps = new HashSet(); RestApp aMaster = null; for (int i = 0; i < numMasters; i++) { - RestApp master = new RestApp(choosePort(), - zkConnect, KAFKASTORE_TOPIC, + RestApp master = new RestApp(zkConnect, KAFKASTORE_TOPIC, AvroCompatibilityLevel.NONE.name, true); masterApps.add(master); master.start(); @@ -443,8 +437,7 @@ public void testRegistrationOnMasterSlaveClusters() throws Exception { */ public void testIncreasingIdZkResetLow() throws Exception { // create schema registry instance 1 - final RestApp restApp1 = new RestApp(choosePort(), - zkConnect, KAFKASTORE_TOPIC); + final RestApp restApp1 = new RestApp(zkConnect, KAFKASTORE_TOPIC); restApp1.start(); List schemas = TestUtils.getRandomCanonicalAvroString(ID_BATCH_SIZE); String subject = "testSubject"; @@ -473,8 +466,7 @@ public void testIncreasingIdZkResetLow() throws Exception { maxId = newId; // Add another schema registry and trigger reelection - final RestApp restApp2 = new RestApp(choosePort(), - zkConnect, KAFKASTORE_TOPIC); + final RestApp restApp2 = new RestApp(zkConnect, KAFKASTORE_TOPIC); restApp2.start(); restApp1.stop(); Callable electionComplete = new Callable() { @@ -524,8 +516,7 @@ public void testIdBehaviorWithZkWithoutKafka() throws Exception { ZkUtils.createPersistentPath(zkClient, ZK_ID_COUNTER_PATH, "" + weirdInitialCounterValue); // Check that zookeeper id counter is updated sensibly during SchemaRegistry bootstrap process - final RestApp restApp = new RestApp(choosePort(), - zkConnect, KAFKASTORE_TOPIC); + final RestApp restApp = new RestApp(zkConnect, KAFKASTORE_TOPIC); restApp.start(); assertEquals("", 2 * ID_BATCH_SIZE, getZkIdCounter(zkClient)); } @@ -542,7 +533,7 @@ public void testIdBehaviorWithoutZkWithKafka() throws Exception { List schemas = TestUtils.getRandomCanonicalAvroString(numSchemas); String subject = "testSubject"; Set ids = new HashSet(); - RestApp restApp = new RestApp(choosePort(), zkConnect, KAFKASTORE_TOPIC); + RestApp restApp = new RestApp(zkConnect, KAFKASTORE_TOPIC); restApp.start(); for (String schema: schemas) { int id = restApp.restClient.registerSchema(schema, subject); @@ -557,7 +548,7 @@ public void testIdBehaviorWithoutZkWithKafka() throws Exception { zkClient.delete(ZK_ID_COUNTER_PATH); // start up another app instance and verify zk id node - restApp = new RestApp(choosePort(), zkConnect, KAFKASTORE_TOPIC); + restApp = new RestApp(zkConnect, KAFKASTORE_TOPIC); restApp.start(); zkIdCounter = getZkIdCounter(zkClient); assertEquals("ZK id counter was incorrectly initialized.", 2 * ID_BATCH_SIZE, zkIdCounter); @@ -567,7 +558,7 @@ public void testIdBehaviorWithoutZkWithKafka() throws Exception { @Test /** Verify expected value of zk schema id counter when schema registry starts up. */ public void testZkCounterOnStartup() throws Exception { - RestApp restApp = new RestApp(choosePort(), zkConnect, KAFKASTORE_TOPIC); + RestApp restApp = new RestApp(zkConnect, KAFKASTORE_TOPIC); restApp.start(); int zkIdCounter = getZkIdCounter(zkClient);