Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import javax.ws.rs.core.Configurable;
Expand All @@ -32,6 +35,7 @@
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException;
import io.confluent.kafka.schemaregistry.storage.serialization.SchemaRegistrySerializer;
import io.confluent.rest.Application;
import io.confluent.rest.RestConfig;
import io.confluent.rest.RestConfigException;

public class SchemaRegistryRestApplication extends Application<SchemaRegistryConfig> {
Expand All @@ -50,9 +54,13 @@ public SchemaRegistryRestApplication(SchemaRegistryConfig config) {
@Override
public void setupResources(Configurable<?> config, SchemaRegistryConfig schemaRegistryConfig) {
try {
schemaRegistry = new KafkaSchemaRegistry(schemaRegistryConfig,
schemaRegistry = new KafkaSchemaRegistry(this,
new SchemaRegistrySerializer());
schemaRegistry.init();
// This is a bit of a hack to deal with startup ordering issues when port == 0. See note in
// onStarted.
if (listeners().get(0).getPort() > 0) {
schemaRegistry.init();
}
} catch (SchemaRegistryException e) {
log.error("Error starting the schema registry", e);
System.exit(1);
Expand All @@ -65,6 +73,27 @@ public void setupResources(Configurable<?> config, SchemaRegistryConfig schemaRe
config.register(new CompatibilityResource(schemaRegistry));
}

@Override
public void onStarted() {
// We override this only for the case of unit/integration tests that use port == 0 to avoid
// issues with requiring specific ports/parallel tests/ability to determine truly free ports.
// We have to reorder startup a bit in this case because schema registry identity information
// includes the port, but we cannot know this until Jetty completes startup. For the relevant
// tests, it is safe to reorder since the relevant steps are blocking, so by the time we finally
// return from the Jetty's Server.start() call, everything will be initialized. In real
// deployments the port would always be fixed and we would want to run this initialization
// before Server.start() (in setupResources) to ensure we have a connection and can handle
// requests when we start Jetty listening.
if (listeners().get(0).getPort() == 0) {
try {
schemaRegistry.init();
} catch (SchemaRegistryException e) {
log.error("Error starting the schema registry", e);
System.exit(1);
}
}
}

@Override
public void onShutdown() {
schemaRegistry.close();
Expand All @@ -74,4 +103,13 @@ public void onShutdown() {
public KafkaSchemaRegistry schemaRegistry() {
return schemaRegistry;
}

private List<URI> listeners() {
return parseListeners(
config.getList(RestConfig.LISTENERS_CONFIG),
config.getInt(RestConfig.PORT_CONFIG),
Arrays.asList("http", "https"),
"http"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,18 @@ public class KafkaSchemaRegistry implements SchemaRegistry {
public static final String ZOOKEEPER_SCHEMA_ID_COUNTER = "/schema_id_counter";
private static final int ZOOKEEPER_SCHEMA_ID_COUNTER_BATCH_WRITE_RETRY_BACKOFF_MS = 50;
private static final Logger log = LoggerFactory.getLogger(KafkaSchemaRegistry.class);

private final Application<SchemaRegistryConfig> app;
final Map<Integer, SchemaKey> guidToSchemaKey;
final Map<MD5, SchemaIdAndSubjects> schemaHashToGuid;
private final KafkaStore<SchemaRegistryKey, SchemaRegistryValue> kafkaStore;
private final Serializer<SchemaRegistryKey, SchemaRegistryValue> serializer;
private final SchemaRegistryIdentity myIdentity;
private final Object masterLock = new Object();
private final AvroCompatibilityLevel defaultCompatibilityLevel;
private final String schemaRegistryZkNamespace;
private final String kafkaClusterZkUrl;
private final int zkSessionTimeoutMs;
private final int kafkaStoreTimeoutMs;
private final boolean isEligibleForMasterElector;
private String schemaRegistryZkUrl;
private KafkaStore<SchemaRegistryKey, SchemaRegistryValue> kafkaStore;
private SchemaRegistryIdentity myIdentity;
private ZkUtils zkUtils;
private SchemaRegistryIdentity masterIdentity;
private RestService masterRestService;
Expand All @@ -119,28 +118,39 @@ public class KafkaSchemaRegistry implements SchemaRegistry {
// data is already in the kafkastore.
private int maxIdInKafkaStore = -1;

public KafkaSchemaRegistry(SchemaRegistryConfig config,
public KafkaSchemaRegistry(Application<SchemaRegistryConfig> app,
Serializer<SchemaRegistryKey, SchemaRegistryValue> serializer)
throws SchemaRegistryException {
String host = config.getString(SchemaRegistryConfig.HOST_NAME_CONFIG);
int port = getPortForIdentity(config.getInt(SchemaRegistryConfig.PORT_CONFIG),
config.getList(RestConfig.LISTENERS_CONFIG));
this.app = app;
SchemaRegistryConfig config = app.getConfiguration();
this.schemaRegistryZkNamespace = config.getString(SchemaRegistryConfig.SCHEMAREGISTRY_ZK_NAMESPACE);
this.isEligibleForMasterElector = config.getBoolean(SchemaRegistryConfig.MASTER_ELIGIBILITY);
this.myIdentity = new SchemaRegistryIdentity(host, port, isEligibleForMasterElector);
this.kafkaClusterZkUrl =
config.getString(SchemaRegistryConfig.KAFKASTORE_CONNECTION_URL_CONFIG);
this.zkSessionTimeoutMs =
config.getInt(SchemaRegistryConfig.KAFKASTORE_ZK_SESSION_TIMEOUT_MS_CONFIG);
this.kafkaStoreTimeoutMs =
this.kafkaStoreTimeoutMs =
config.getInt(SchemaRegistryConfig.KAFKASTORE_TIMEOUT_CONFIG);
this.serializer = serializer;
this.defaultCompatibilityLevel = config.compatibilityType();
this.guidToSchemaKey = new HashMap<Integer, SchemaKey>();
this.schemaHashToGuid = new HashMap<MD5, SchemaIdAndSubjects>();
}

@Override
public void init() throws SchemaRegistryInitializationException {
// A Schema Registry instance's identity is in part the port it listens on. Current listeners
// are possible, so the first is used for the identity (and the connection to be used to
// forward requests if this node is the master). In theory, any port from any listener would
// be sufficient. Choosing the first, instead of say the last,
// is arbitrary.
//
// To support tests that dynamically allocate ports (i.e. set value to 0), we are careful to
// use the actually listening port rather than the PORT or LISTENER settings directly.
SchemaRegistryConfig config = app.getConfiguration();
String host = config.getString(SchemaRegistryConfig.HOST_NAME_CONFIG);
int listeningPort = app.localPorts().get(0);
this.myIdentity = new SchemaRegistryIdentity(host, listeningPort, isEligibleForMasterElector);
kafkaStore =
new KafkaStore<SchemaRegistryKey, SchemaRegistryValue>(
config,
listeningPort,
new KafkaStoreMessageHandler(this),
this.serializer,
new InMemoryStore<SchemaRegistryKey, SchemaRegistryValue>(), new NoopKey());
Expand All @@ -161,28 +171,7 @@ public KafkaSchemaRegistry(SchemaRegistryConfig config,
+ " node where all register schema and config update requests are "
+ "served.");
this.masterNodeSensor.add(m, new Gauge());
}

/**
* A Schema Registry instance's identity is in part the port it listens on. Currently the
* port can either be configured via the deprecated `port` configuration, or via the `listeners`
* configuration.
*
* This method uses `Application.parseListeners()` from `rest-utils` to get a list of listeners, and
* returns the port of the first listener to be used for the instance's identity.
*
* In theory, any port from any listener would be sufficient. Choosing the first, instead of say the last,
* is arbitrary.
*/
// TODO: once RestConfig.PORT_CONFIG is deprecated, remove the port parameter.
static int getPortForIdentity(int port, List<String> configuredListeners) {
List<URI> listeners = Application.parseListeners(configuredListeners, port,
Arrays.asList("http", "https"), "http");
return listeners.get(0).getPort();
}

@Override
public void init() throws SchemaRegistryInitializationException {
try {
kafkaStore.init();
} catch (StoreInitializationException e) {
Expand All @@ -203,13 +192,17 @@ public void init() throws SchemaRegistryInitializationException {
}

private void createZkNamespace() {
int zkSessionTimeoutMs =
app.getConfiguration().getInt(SchemaRegistryConfig.KAFKASTORE_ZK_SESSION_TIMEOUT_MS_CONFIG);
String kafkaClusterZkUrl =
app.getConfiguration().getString(SchemaRegistryConfig.KAFKASTORE_CONNECTION_URL_CONFIG);
int kafkaNamespaceIndex = kafkaClusterZkUrl.indexOf("/");
String zkConnForNamespaceCreation = kafkaNamespaceIndex > 0 ?
kafkaClusterZkUrl.substring(0, kafkaNamespaceIndex) :
kafkaClusterZkUrl;

String schemaRegistryNamespace = "/" + schemaRegistryZkNamespace;
schemaRegistryZkUrl = zkConnForNamespaceCreation + schemaRegistryNamespace;
String schemaRegistryZkUrl = zkConnForNamespaceCreation + schemaRegistryNamespace;

ZkUtils zkUtilsForNamespaceCreation = ZkUtils.apply(
zkConnForNamespaceCreation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class KafkaStore<K, V> implements Store<K, V> {
private final SchemaRegistryConfig config;

public KafkaStore(SchemaRegistryConfig config,
int identityPort,
StoreUpdateHandler<K, V> storeUpdateHandler,
Serializer<K, V> serializer,
Store<K, V> localStore,
Expand All @@ -93,11 +94,9 @@ public KafkaStore(SchemaRegistryConfig config,
this.topic = config.getString(SchemaRegistryConfig.KAFKASTORE_TOPIC_CONFIG);
this.desiredReplicationFactor =
config.getInt(SchemaRegistryConfig.KAFKASTORE_TOPIC_REPLICATION_FACTOR_CONFIG);
int port = KafkaSchemaRegistry.getPortForIdentity(config.getInt(SchemaRegistryConfig.PORT_CONFIG),
config.getList(RestConfig.LISTENERS_CONFIG));
this.groupId = String.format("schema-registry-%s-%d",
config.getString(SchemaRegistryConfig.HOST_NAME_CONFIG),
port);
identityPort);
initTimeout = config.getInt(SchemaRegistryConfig.KAFKASTORE_INIT_TIMEOUT_CONFIG);
timeout = config.getInt(SchemaRegistryConfig.KAFKASTORE_TIMEOUT_CONFIG);
this.storeUpdateHandler = storeUpdateHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,32 +52,6 @@ public abstract class ClusterTestHarness {
public static final String KAFKASTORE_TOPIC = SchemaRegistryConfig.DEFAULT_KAFKASTORE_TOPIC;
protected static final Option<Properties> SASL_PROPERTIES = Option$.MODULE$.<Properties>empty();

/**
* Choose a number of random available ports
*/
public static int[] choosePorts(int count) {
try {
ServerSocket[] sockets = new ServerSocket[count];
int[] ports = new int[count];
for (int i = 0; i < count; i++) {
sockets[i] = new ServerSocket(0, 0, InetAddress.getByName("0.0.0.0"));
ports[i] = sockets[i].getLocalPort();
}
for (int i = 0; i < count; i++)
sockets[i].close();
return ports;
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/**
* Choose an available port
*/
public static int choosePort() {
return choosePorts(1)[0];
}

private int numBrokers;
private boolean setupRestApp;
private String compatibilityType;
Expand Down Expand Up @@ -139,7 +113,7 @@ public void setUp() throws Exception {
getSecurityProtocol());

if (setupRestApp) {
restApp = new RestApp(choosePort(), zkConnect, KAFKASTORE_TOPIC, compatibilityType);
restApp = new RestApp(zkConnect, KAFKASTORE_TOPIC, compatibilityType);
restApp.start();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,18 @@ public class RestApp {
public Server restServer;
public String restConnect;

public RestApp(int port, String zkConnect, String kafkaTopic) {
this(port, zkConnect, kafkaTopic, AvroCompatibilityLevel.NONE.name);
public RestApp(String zkConnect, String kafkaTopic) {
this(zkConnect, kafkaTopic, AvroCompatibilityLevel.NONE.name);
}

public RestApp(int port, String zkConnect, String kafkaTopic, String compatibilityType) {
this(port, zkConnect, kafkaTopic, compatibilityType, true);
public RestApp(String zkConnect, String kafkaTopic, String compatibilityType) {
this(zkConnect, kafkaTopic, compatibilityType, true);
}

public RestApp(int port, String zkConnect, String kafkaTopic,
public RestApp(String zkConnect, String kafkaTopic,
String compatibilityType, boolean masterEligibility) {
prop = new Properties();
prop.setProperty(SchemaRegistryConfig.PORT_CONFIG, ((Integer) port).toString());
prop.setProperty(SchemaRegistryConfig.PORT_CONFIG, "0");
prop.setProperty(SchemaRegistryConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect);
prop.put(SchemaRegistryConfig.KAFKASTORE_TOPIC_CONFIG, kafkaTopic);
prop.put(SchemaRegistryConfig.COMPATIBILITY_CONFIG, compatibilityType);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public static KafkaStore<String, String> createAndInitKafkaStoreInstance(
}

KafkaStore<String, String> kafkaStore =
new KafkaStore<String, String>(config, new StringMessageHandler(),
new KafkaStore<String, String>(config, 0, new StringMessageHandler(),
StringSerializer.INSTANCE,
inMemoryStore, new NoopKey().toString());
try {
Expand Down
Loading