Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,15 @@ public CachedSchemaRegistryClient(
this(new RestService(baseUrls), cacheCapacity, providers, originals, httpHeaders);
}

public CachedSchemaRegistryClient(
String baseUrls,
int cacheCapacity,
List<SchemaProvider> providers,
Map<String, ?> originals,
Map<String, String> httpHeaders) {
this(new RestService(baseUrls), cacheCapacity, providers, originals, httpHeaders);
}

public CachedSchemaRegistryClient(
RestService restService,
int cacheCapacity,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,24 @@ public static SchemaRegistryClient newClient(
);
}
}

public static SchemaRegistryClient newClient(
String baseUrls,
int cacheCapacity,
List<SchemaProvider> providers,
Map<String, ?> configs,
Map<String, String> httpHeaders) {
List<String> mockScopes = MockSchemaRegistry.validateAndMaybeGetMockScope(baseUrls);
if (mockScopes != null) {
return MockSchemaRegistry.getClientForScope(mockScopes, providers);
} else {
return new CachedSchemaRegistryClient(
baseUrls,
cacheCapacity,
providers,
configs,
httpHeaders
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.apache.kafka.common.config.ConfigException;

import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -92,6 +93,29 @@ public static SchemaRegistryClient getClientForScope(final String scope,
return SCOPED_CLIENTS.get(scope);
}

/**
* Get a client for a mocked Schema Registry. The {@code scope} represents a particular registry,
* so operations on one scope will never affect another.
*
* @param scopes Identifies a logically independent Schema Registry instance. It's similar to a
* List of schema registry URLs, in that two different Schema Registry deployments
* have two different URLs, except that these registries are only mocked, so they
* have no actual URL.
* @param providers A list of schema providers.
* @return A client for the specified scope.
*/
public static SchemaRegistryClient getClientForScope(final List<String> scopes,
List<SchemaProvider> providers) {
synchronized (SCOPED_CLIENTS) {
for (String scope : scopes) {
if (!SCOPED_CLIENTS.containsKey(scope)) {
SCOPED_CLIENTS.put(scope, new MockSchemaRegistryClient(providers));
}
}
}
return SCOPED_CLIENTS.get(scopes.get(0));
}

/**
* Destroy the mocked registry corresponding to the scope. Subsequent clients for the same scope
* will have a completely blank slate.
Expand Down Expand Up @@ -134,4 +158,32 @@ public static String validateAndMaybeGetMockScope(final List<String> urls) {
return mockScopes.get(0);
}
}

public static List<String> validateAndMaybeGetMockScope(final String baseUrls) {
final List<String> mockScopes = new LinkedList<>();
List<String> urls = parseBaseUrl(baseUrls);
for (final String url : urls) {
if (url.startsWith(MOCK_URL_PREFIX)) {
mockScopes.add(url.substring(MOCK_URL_PREFIX.length()));
}
}

if (mockScopes.isEmpty()) {
return null;
} else if (urls.size() > mockScopes.size()) {
throw new ConfigException(
"Cannot mix mock and real urls for 'schema.registry.url'. Got: " + urls
);
} else {
return mockScopes;
}
}

private static List<String> parseBaseUrl(String baseUrls) {
List<String> urls = Arrays.asList(baseUrls.split("\\s*,\\s*"));
if (urls.isEmpty()) {
throw new IllegalArgumentException("Missing required schema registry url list");
}
return urls;
}
}