diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java index 5304ee6b0ae..8dde7565caf 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java @@ -165,6 +165,15 @@ public CachedSchemaRegistryClient( this(new RestService(baseUrls), cacheCapacity, providers, originals, httpHeaders); } + public CachedSchemaRegistryClient( + String baseUrls, + int cacheCapacity, + List providers, + Map originals, + Map httpHeaders) { + this(new RestService(baseUrls), cacheCapacity, providers, originals, httpHeaders); + } + public CachedSchemaRegistryClient( RestService restService, int cacheCapacity, diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClientFactory.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClientFactory.java index 8882e817b4a..82f72002386 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClientFactory.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClientFactory.java @@ -42,4 +42,24 @@ public static SchemaRegistryClient newClient( ); } } + + public static SchemaRegistryClient newClient( + String baseUrls, + int cacheCapacity, + List providers, + Map configs, + Map httpHeaders) { + List mockScopes = MockSchemaRegistry.validateAndMaybeGetMockScope(baseUrls); + if (mockScopes != null) { + return MockSchemaRegistry.getClientForScope(mockScopes, providers); + } else { + return new CachedSchemaRegistryClient( + baseUrls, + cacheCapacity, + providers, + configs, + httpHeaders + ); + } + } } diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/testutil/MockSchemaRegistry.java b/client/src/main/java/io/confluent/kafka/schemaregistry/testutil/MockSchemaRegistry.java index 6ddf7517412..01f142aed6d 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/testutil/MockSchemaRegistry.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/testutil/MockSchemaRegistry.java @@ -21,6 +21,7 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import org.apache.kafka.common.config.ConfigException; +import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -92,6 +93,29 @@ public static SchemaRegistryClient getClientForScope(final String scope, return SCOPED_CLIENTS.get(scope); } + /** + * Get a client for a mocked Schema Registry. The {@code scope} represents a particular registry, + * so operations on one scope will never affect another. + * + * @param scopes Identifies a logically independent Schema Registry instance. It's similar to a + * List of schema registry URLs, in that two different Schema Registry deployments + * have two different URLs, except that these registries are only mocked, so they + * have no actual URL. + * @param providers A list of schema providers. + * @return A client for the specified scope. + */ + public static SchemaRegistryClient getClientForScope(final List scopes, + List providers) { + synchronized (SCOPED_CLIENTS) { + for (String scope : scopes) { + if (!SCOPED_CLIENTS.containsKey(scope)) { + SCOPED_CLIENTS.put(scope, new MockSchemaRegistryClient(providers)); + } + } + } + return SCOPED_CLIENTS.get(scopes.get(0)); + } + /** * Destroy the mocked registry corresponding to the scope. Subsequent clients for the same scope * will have a completely blank slate. @@ -134,4 +158,32 @@ public static String validateAndMaybeGetMockScope(final List urls) { return mockScopes.get(0); } } + + public static List validateAndMaybeGetMockScope(final String baseUrls) { + final List mockScopes = new LinkedList<>(); + List urls = parseBaseUrl(baseUrls); + for (final String url : urls) { + if (url.startsWith(MOCK_URL_PREFIX)) { + mockScopes.add(url.substring(MOCK_URL_PREFIX.length())); + } + } + + if (mockScopes.isEmpty()) { + return null; + } else if (urls.size() > mockScopes.size()) { + throw new ConfigException( + "Cannot mix mock and real urls for 'schema.registry.url'. Got: " + urls + ); + } else { + return mockScopes; + } + } + + private static List parseBaseUrl(String baseUrls) { + List urls = Arrays.asList(baseUrls.split("\\s*,\\s*")); + if (urls.isEmpty()) { + throw new IllegalArgumentException("Missing required schema registry url list"); + } + return urls; + } }