permissions) throws IOException
- {
- final Request request = new Request(
- HttpMethod.POST,
- new URL(
- StringUtils.format(
- "%s/roles/%s/permissions/",
- getAuthorizerURL(),
- StringUtils.urlEncode(role)
- )
- )
- ).setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(permissions));
- Assert.assertEquals(HttpResponseStatus.OK, sendRequest(request).getStatus());
- }
-
- private StatusResponseHolder sendRequest(Request request)
- {
- try {
- final StatusResponseHolder response = httpClient.go(
- request,
- responseHandler
- ).get();
-
- if (!response.getStatus().equals(HttpResponseStatus.OK)) {
- throw new ISE(
- "Error while creating users status [%s] content [%s]",
- response.getStatus(),
- response.getContent()
- );
- }
-
- return response;
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private String getAuthenticatorURL()
- {
- return StringUtils.format(
- "%s/druid-ext/basic-security/authentication/db/basic",
- coordinator
- );
- }
-
- private String getAuthorizerURL()
- {
- return StringUtils.format(
- "%s/druid-ext/basic-security/authorization/db/basic",
- coordinator
- );
- }
-}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/security/ITCoordinatorOverlordProxyAuthTest.java b/integration-tests/src/test/java/org/apache/druid/tests/security/ITCoordinatorOverlordProxyAuthTest.java
deleted file mode 100644
index f6cc85ab1704..000000000000
--- a/integration-tests/src/test/java/org/apache/druid/tests/security/ITCoordinatorOverlordProxyAuthTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.security;
-
-import com.google.inject.Inject;
-import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.tests.TestNGGroup;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.testng.Assert;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-@Test(groups = TestNGGroup.SECURITY)
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITCoordinatorOverlordProxyAuthTest
-{
- @Inject
- CoordinatorResourceTestClient coordinatorClient;
-
- @Test
- public void testProxyAuth()
- {
- HttpResponseStatus responseStatus = coordinatorClient.getProxiedOverlordScalingResponseStatus();
- Assert.assertEquals(responseStatus, HttpResponseStatus.OK);
- }
-}
diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorServiceClient.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorServiceClient.java
deleted file mode 100644
index a50d9b19862a..000000000000
--- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorServiceClient.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.client.coordinator;
-
-import org.apache.druid.discovery.NodeRole;
-import org.apache.druid.rpc.ServiceClient;
-import org.apache.druid.rpc.ServiceClientFactory;
-import org.apache.druid.rpc.ServiceLocator;
-import org.apache.druid.rpc.StandardRetryPolicy;
-
-/**
- * Wrapper over {@link ServiceClient} used to send requests to the Coordinator.
- *
- * This client should be used to hit Coordinator APIs added by extensions.
- * For core Coordinator APIs, use {@link CoordinatorClient} instead.
- */
-public class CoordinatorServiceClient
-{
- private final ServiceClient serviceClient;
-
- public CoordinatorServiceClient(
- ServiceClientFactory clientFactory,
- ServiceLocator coordinatorServiceLocator,
- int maxRetryAttempts
- )
- {
- this.serviceClient = clientFactory.makeClient(
- NodeRole.COORDINATOR.getJsonName(),
- coordinatorServiceLocator,
- StandardRetryPolicy.builder().maxAttempts(maxRetryAttempts).build()
- );
- }
-
- public ServiceClient getServiceClient()
- {
- return serviceClient;
- }
-}
diff --git a/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java b/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java
index d99020af708b..203886f99fea 100644
--- a/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java
+++ b/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java
@@ -28,7 +28,6 @@
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.coordinator.CoordinatorClientImpl;
-import org.apache.druid.client.coordinator.CoordinatorServiceClient;
import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
@@ -40,6 +39,7 @@
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.rpc.DiscoveryServiceLocator;
+import org.apache.druid.rpc.ServiceClient;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceClientFactoryImpl;
import org.apache.druid.rpc.ServiceLocator;
@@ -77,23 +77,29 @@ public ServiceLocator makeOverlordServiceLocator(final DruidNodeDiscoveryProvide
}
@Provides
- @LazySingleton
- public OverlordClient makeOverlordClient(
- @Json final ObjectMapper jsonMapper,
+ @IndexingService
+ public ServiceClient makeServiceClientForOverlord(
@EscalatedGlobal final ServiceClientFactory clientFactory,
@IndexingService final ServiceLocator serviceLocator
)
{
- return new OverlordClientImpl(
- clientFactory.makeClient(
- NodeRole.OVERLORD.getJsonName(),
- serviceLocator,
- StandardRetryPolicy.builder().maxAttempts(CLIENT_MAX_ATTEMPTS).build()
- ),
- jsonMapper
+ return clientFactory.makeClient(
+ NodeRole.OVERLORD.getJsonName(),
+ serviceLocator,
+ StandardRetryPolicy.builder().maxAttempts(CLIENT_MAX_ATTEMPTS).build()
);
}
+ @Provides
+ @LazySingleton
+ public OverlordClient makeOverlordClient(
+ @Json final ObjectMapper jsonMapper,
+ @IndexingService final ServiceClient serviceClient
+ )
+ {
+ return new OverlordClientImpl(serviceClient, jsonMapper);
+ }
+
@Provides
@ManageLifecycle
@Coordinator
@@ -103,36 +109,27 @@ public ServiceLocator makeCoordinatorServiceLocator(final DruidNodeDiscoveryProv
}
@Provides
- @LazySingleton
- public CoordinatorClient makeCoordinatorClient(
- @Json final ObjectMapper jsonMapper,
+ @Coordinator
+ public ServiceClient makeServiceClientForCoordinator(
@EscalatedGlobal final ServiceClientFactory clientFactory,
@Coordinator final ServiceLocator serviceLocator
)
{
- return new CoordinatorClientImpl(
- clientFactory.makeClient(
- NodeRole.COORDINATOR.getJsonName(),
- serviceLocator,
- StandardRetryPolicy.builder().maxAttempts(CLIENT_MAX_ATTEMPTS).build()
- ),
- jsonMapper
+ return clientFactory.makeClient(
+ NodeRole.COORDINATOR.getJsonName(),
+ serviceLocator,
+ StandardRetryPolicy.builder().maxAttempts(CLIENT_MAX_ATTEMPTS).build()
);
}
- /**
- * Creates a {@link CoordinatorServiceClient} used by extensions to send
- * requests to the Coordinator. For core Coordinator APIs,
- * {@link CoordinatorClient} should be used instead.
- */
@Provides
@LazySingleton
- public static CoordinatorServiceClient createCoordinatorServiceClient(
- @EscalatedGlobal ServiceClientFactory clientFactory,
- @Coordinator ServiceLocator serviceLocator
+ public CoordinatorClient makeCoordinatorClient(
+ @Json final ObjectMapper jsonMapper,
+ @Coordinator final ServiceClient serviceClient
)
{
- return new CoordinatorServiceClient(clientFactory, serviceLocator, CLIENT_MAX_ATTEMPTS);
+ return new CoordinatorClientImpl(serviceClient, jsonMapper);
}
@Provides
@@ -144,23 +141,29 @@ public ServiceLocator makeBrokerServiceLocator(final DruidNodeDiscoveryProvider
}
@Provides
- @LazySingleton
- public BrokerClient makeBrokerClient(
- @Json final ObjectMapper jsonMapper,
+ @Broker
+ public ServiceClient makeServiceClientForBroker(
@EscalatedGlobal final ServiceClientFactory clientFactory,
@Broker final ServiceLocator serviceLocator
)
{
- return new BrokerClientImpl(
- clientFactory.makeClient(
- NodeRole.BROKER.getJsonName(),
- serviceLocator,
- StandardRetryPolicy.builder().maxAttempts(ServiceClientModule.CLIENT_MAX_ATTEMPTS).build()
- ),
- jsonMapper
+ return clientFactory.makeClient(
+ NodeRole.BROKER.getJsonName(),
+ serviceLocator,
+ StandardRetryPolicy.builder().maxAttempts(ServiceClientModule.CLIENT_MAX_ATTEMPTS).build()
);
}
+ @Provides
+ @LazySingleton
+ public BrokerClient makeBrokerClient(
+ @Json final ObjectMapper jsonMapper,
+ @Broker final ServiceClient serviceClient
+ )
+ {
+ return new BrokerClientImpl(serviceClient, jsonMapper);
+ }
+
public static ServiceClientFactory makeServiceClientFactory(@EscalatedGlobal final HttpClient httpClient)
{
final ScheduledExecutorService connectExec =
diff --git a/server/src/test/java/org/apache/druid/rpc/guice/ServiceClientModuleTest.java b/server/src/test/java/org/apache/druid/rpc/guice/ServiceClientModuleTest.java
index a084a1e53139..f039a023ad9f 100644
--- a/server/src/test/java/org/apache/druid/rpc/guice/ServiceClientModuleTest.java
+++ b/server/src/test/java/org/apache/druid/rpc/guice/ServiceClientModuleTest.java
@@ -22,15 +22,19 @@
import com.google.common.collect.ImmutableList;
import com.google.inject.Guice;
import com.google.inject.Injector;
+import com.google.inject.Key;
+import org.apache.druid.client.broker.Broker;
import org.apache.druid.client.broker.BrokerClient;
+import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.client.coordinator.CoordinatorClient;
-import org.apache.druid.client.coordinator.CoordinatorServiceClient;
+import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.guice.DruidGuiceExtensions;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.jackson.JacksonModule;
import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.rpc.ServiceClient;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceLocator;
import org.apache.druid.rpc.indexing.OverlordClient;
@@ -108,6 +112,18 @@ public void testGetBrokerClient()
@Test
public void testGetCoordinatorServiceClient()
{
- assertNotNull(injector.getInstance(CoordinatorServiceClient.class));
+ assertNotNull(injector.getInstance(Key.get(ServiceClient.class, Coordinator.class)));
+ }
+
+ @Test
+ public void testGetOverlordServiceClient()
+ {
+ assertNotNull(injector.getInstance(Key.get(ServiceClient.class, IndexingService.class)));
+ }
+
+ @Test
+ public void testGetBrokerServiceClient()
+ {
+ assertNotNull(injector.getInstance(Key.get(ServiceClient.class, Broker.class)));
}
}
diff --git a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
index 954e0f117c43..878cc055ffdb 100644
--- a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
+++ b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
@@ -22,9 +22,9 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.client.broker.BrokerClient;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.TaskStatusResponse;
-import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
@@ -66,28 +66,60 @@
* @see #onLeaderOverlord(Function)
* @see #runSql(String, Object...)
*/
-public class EmbeddedClusterApis
+public class EmbeddedClusterApis implements EmbeddedResource
{
private final EmbeddedDruidCluster cluster;
+ private EmbeddedServiceClient client;
EmbeddedClusterApis(EmbeddedDruidCluster cluster)
{
this.cluster = cluster;
}
+ @Override
+ public void start() throws Exception
+ {
+ this.client = EmbeddedServiceClient.create(cluster, null);
+ }
+
+ @Override
+ public void stop() throws Exception
+ {
+ if (client != null) {
+ client.stop();
+ client = null;
+ }
+ }
+
+ /**
+ * Client used for all the API calls made by this {@link EmbeddedClusterApis}.
+ */
+ public EmbeddedServiceClient serviceClient()
+ {
+ return Objects.requireNonNull(
+ client,
+ "Service clients are not initialized. Ensure that the cluster has started properly."
+ );
+ }
+
public T onLeaderCoordinator(Function> coordinatorApi)
{
- return getResult(coordinatorApi.apply(cluster.leaderCoordinator()));
+ return client.onLeaderCoordinator(coordinatorApi);
}
public T onLeaderCoordinatorSync(Function coordinatorApi)
{
- return coordinatorApi.apply(cluster.leaderCoordinator());
+ return client.onLeaderCoordinatorSync(coordinatorApi);
}
public T onLeaderOverlord(Function> overlordApi)
{
- return getResult(overlordApi.apply(cluster.leaderOverlord()));
+ return client.onLeaderOverlord(overlordApi);
+ }
+
+ public T onAnyBroker(Function> brokerApi)
+ {
+ return client.onAnyBroker(brokerApi);
}
/**
@@ -99,8 +131,8 @@ public T onLeaderOverlord(Function> over
public String runSql(String sql, Object... args)
{
try {
- return getResult(
- cluster.anyBroker().submitSqlQuery(
+ return onAnyBroker(
+ b -> b.submitSqlQuery(
new ClientSqlQuery(
StringUtils.format(sql, args),
ResultFormat.CSV.name(),
@@ -162,9 +194,11 @@ public void submitTask(Task task)
*/
public void waitForTaskToSucceed(String taskId, EmbeddedOverlord overlord)
{
+ TaskStatus taskStatus = waitForTaskToFinish(taskId, overlord);
Assertions.assertEquals(
TaskState.SUCCESS,
- waitForTaskToFinish(taskId, overlord).getStatusCode()
+ taskStatus.getStatusCode(),
+ StringUtils.format("Task[%s] failed with error[%s]", taskId, taskStatus.getErrorMsg())
);
}
@@ -380,11 +414,6 @@ public static List createAlignedIntervals(
return alignedIntervals;
}
- private static T getResult(ListenableFuture future)
- {
- return FutureUtils.getUnchecked(future, true);
- }
-
@FunctionalInterface
public interface TaskBuilder
{
diff --git a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
index 3fb2cba18bc5..dabceaff4b98 100644
--- a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
+++ b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
@@ -21,13 +21,10 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import org.apache.druid.client.broker.BrokerClient;
-import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.metrics.LatchableEmitter;
import org.apache.druid.testing.embedded.derby.InMemoryDerbyModule;
import org.apache.druid.testing.embedded.derby.InMemoryDerbyResource;
@@ -71,7 +68,7 @@
* cluster.stop();
*
*/
-public class EmbeddedDruidCluster implements ClusterReferencesProvider, EmbeddedResource
+public class EmbeddedDruidCluster implements EmbeddedResource
{
private static final Logger log = new Logger(EmbeddedDruidCluster.class);
@@ -250,6 +247,12 @@ public void start() throws Exception
{
Preconditions.checkArgument(!servers.isEmpty(), "Cluster must have at least one embedded Druid server");
+ // Add clusterApis as the last entry in the resources list, so that the
+ // EmbeddedServiceClient is initialized after mappers have been injected into the servers
+ if (!startedFirstDruidServer) {
+ resources.add(clusterApis);
+ }
+
// Start the resources in order
for (EmbeddedResource resource : resources) {
try {
@@ -314,25 +317,12 @@ public String runSql(String sql, Object... args)
return clusterApis.runSql(sql, args);
}
- @Override
- public CoordinatorClient leaderCoordinator()
- {
- return findServerOfType(EmbeddedCoordinator.class).bindings().leaderCoordinator();
- }
-
- @Override
- public OverlordClient leaderOverlord()
- {
- return findServerOfType(EmbeddedOverlord.class).bindings().leaderOverlord();
- }
-
- @Override
- public BrokerClient anyBroker()
+ EmbeddedDruidServer> anyServer()
{
- return findServerOfType(EmbeddedBroker.class).bindings().anyBroker();
+ return servers.get(0);
}
- private > EmbeddedDruidServer findServerOfType(
+ > S findServerOfType(
Class serverType
)
{
diff --git a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
new file mode 100644
index 000000000000..cf7322b1b8ff
--- /dev/null
+++ b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.client.broker.Broker;
+import org.apache.druid.client.broker.BrokerClient;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.rpc.RequestBuilder;
+import org.apache.druid.rpc.ServiceClient;
+import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.rpc.ServiceClientFactoryImpl;
+import org.apache.druid.rpc.ServiceLocator;
+import org.apache.druid.rpc.guice.ServiceClientModule;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.server.security.Escalator;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+/**
+ * Client to make requests to various services in an embedded test cluster.
+ *
+ * @see #onLeaderOverlord(Function)
+ * @see #onLeaderCoordinator(Function)
+ * @see #onAnyBroker(Function)
+ */
+public class EmbeddedServiceClient
+{
+ private final EmbeddedDruidCluster cluster;
+ private final ServiceClientModule module;
+
+ private final ServiceClient coordinatorServiceClient;
+ private final ServiceClient overlordServiceClient;
+ private final ServiceClient brokerServiceClient;
+
+ private final ScheduledExecutorService clientConnectExec;
+ private final StatusResponseHandler responseHandler;
+
+ private EmbeddedServiceClient(EmbeddedDruidCluster cluster, Escalator escalator)
+ {
+ // Use a ServiceClientModule to create the various clients
+ this.module = new ServiceClientModule();
+ this.cluster = cluster;
+ this.clientConnectExec = ScheduledExecutors.fixed(4, "ServiceClientFactory-%d");
+ this.responseHandler = StatusResponseHandler.getInstance();
+
+ // If this server is stopped, the client becomes invalid
+ final EmbeddedDruidServer> anyServer = cluster.anyServer();
+
+ final HttpClient escalatedHttpClient =
+ escalator == null
+ ? anyServer.bindings().escalatedHttpClient()
+ : escalator.createEscalatedClient(anyServer.bindings().globalHttpClient());
+
+ // Create service clients
+ final ServiceClientFactory factory = new ServiceClientFactoryImpl(escalatedHttpClient, clientConnectExec);
+ this.overlordServiceClient = module.makeServiceClientForOverlord(
+ factory,
+ anyServer.bindings().getInstance(ServiceLocator.class, IndexingService.class)
+ );
+
+ this.brokerServiceClient = module.makeServiceClientForBroker(
+ factory,
+ anyServer.bindings().getInstance(ServiceLocator.class, Broker.class)
+ );
+ this.coordinatorServiceClient = module.makeServiceClientForCoordinator(
+ factory,
+ anyServer.bindings().getInstance(ServiceLocator.class, Coordinator.class)
+ );
+ }
+
+ /**
+ * Creates a client that uses the {@link Escalator} bound to the embedded servers.
+ */
+ public static EmbeddedServiceClient create(EmbeddedDruidCluster cluster)
+ {
+ return new EmbeddedServiceClient(cluster, null);
+ }
+
+ /**
+ * Creates a client using the specified {@link Escalator}. All requests made by this
+ * client will use the given escalator.
+ */
+ public static EmbeddedServiceClient create(EmbeddedDruidCluster cluster, Escalator escalator)
+ {
+ return new EmbeddedServiceClient(cluster, escalator);
+ }
+
+ /**
+ * Stops the executor service used by this client.
+ */
+ public void stop() throws InterruptedException
+ {
+ clientConnectExec.shutdownNow();
+ clientConnectExec.awaitTermination(1, TimeUnit.MINUTES);
+ }
+
+ @Nullable
+ public T onLeaderCoordinator(
+ Function request,
+ TypeReference resultType
+ )
+ {
+ return makeRequest(request, resultType, coordinatorServiceClient, getMapper(EmbeddedCoordinator.class));
+ }
+
+ public T onLeaderCoordinator(Function> coordinatorApi)
+ {
+ return getResult(coordinatorApi.apply(createCoordinatorClient()));
+ }
+
+ public T onLeaderCoordinatorSync(Function coordinatorApi)
+ {
+ return coordinatorApi.apply(createCoordinatorClient());
+ }
+
+ public T onLeaderOverlord(Function> overlordApi)
+ {
+ return getResult(overlordApi.apply(createOverlordClient()));
+ }
+
+ @Nullable
+ public T onLeaderOverlord(
+ Function request,
+ TypeReference resultType
+ )
+ {
+ return makeRequest(request, resultType, overlordServiceClient, getMapper(EmbeddedOverlord.class));
+ }
+
+ public T onAnyBroker(Function> brokerApi)
+ {
+ return getResult(brokerApi.apply(createBrokerClient()));
+ }
+
+ @Nullable
+ private T makeRequest(
+ Function request,
+ TypeReference resultType,
+ ServiceClient serviceClient,
+ ObjectMapper mapper
+ )
+ {
+ final RequestBuilder requestBuilder = request.apply(mapper);
+
+ try {
+ StatusResponseHolder response = serviceClient.request(requestBuilder, responseHandler);
+ if (!response.getStatus().equals(HttpResponseStatus.OK)) {
+ throw new ISE(
+ "Request[%s] failed with status[%s] content[%s].",
+ requestBuilder.toString(),
+ response.getStatus(),
+ response.getContent()
+ );
+ }
+
+ if (resultType == null) {
+ return null;
+ } else {
+ return mapper.readValue(response.getContent(), resultType);
+ }
+ }
+ catch (Exception e) {
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private CoordinatorClient createCoordinatorClient()
+ {
+ return module.makeCoordinatorClient(getMapper(EmbeddedCoordinator.class), coordinatorServiceClient);
+ }
+
+ private OverlordClient createOverlordClient()
+ {
+ return module.makeOverlordClient(getMapper(EmbeddedOverlord.class), overlordServiceClient);
+ }
+
+ private BrokerClient createBrokerClient()
+ {
+ return module.makeBrokerClient(getMapper(EmbeddedBroker.class), brokerServiceClient);
+ }
+
+ private > ObjectMapper getMapper(Class serverType)
+ {
+ return cluster.findServerOfType(serverType).bindings().jsonMapper();
+ }
+
+ private static T getResult(ListenableFuture future)
+ {
+ return FutureUtils.getUnchecked(future, true);
+ }
+}
diff --git a/services/src/test/java/org/apache/druid/testing/embedded/ServerReferenceHolder.java b/services/src/test/java/org/apache/druid/testing/embedded/ServerReferenceHolder.java
index 2e878621afe8..95966e8e3903 100644
--- a/services/src/test/java/org/apache/druid/testing/embedded/ServerReferenceHolder.java
+++ b/services/src/test/java/org/apache/druid/testing/embedded/ServerReferenceHolder.java
@@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.google.inject.Injector;
+import com.google.inject.Key;
import org.apache.druid.client.broker.BrokerClient;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.client.coordinator.CoordinatorClient;
@@ -29,6 +30,7 @@
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.guice.annotations.Global;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -38,6 +40,7 @@
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.metrics.LatchableEmitter;
+import java.lang.annotation.Annotation;
import java.util.Objects;
/**
@@ -78,7 +81,11 @@ public final class ServerReferenceHolder implements ServerReferencesProvider
@Inject
@EscalatedGlobal
- private HttpClient httpClient;
+ private HttpClient escalatedHttpClient;
+
+ @Inject
+ @Global
+ private HttpClient globalHttpClient;
@Self
@Inject
@@ -154,7 +161,13 @@ public DruidNodeDiscoveryProvider nodeDiscovery()
@Override
public HttpClient escalatedHttpClient()
{
- return httpClient;
+ return escalatedHttpClient;
+ }
+
+ @Override
+ public HttpClient globalHttpClient()
+ {
+ return globalHttpClient;
}
@Override
@@ -168,4 +181,10 @@ public T getInstance(Class clazz)
{
return injector.getInstance(clazz);
}
+
+ @Override
+ public T getInstance(Class type, Class annotationType)
+ {
+ return injector.getInstance(Key.get(type, annotationType));
+ }
}
diff --git a/services/src/test/java/org/apache/druid/testing/embedded/ServerReferencesProvider.java b/services/src/test/java/org/apache/druid/testing/embedded/ServerReferencesProvider.java
index ca24218e0fb5..c9d746d565bb 100644
--- a/services/src/test/java/org/apache/druid/testing/embedded/ServerReferencesProvider.java
+++ b/services/src/test/java/org/apache/druid/testing/embedded/ServerReferencesProvider.java
@@ -32,6 +32,8 @@
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.metrics.LatchableEmitter;
+import java.lang.annotation.Annotation;
+
/**
* Provides a handle to the various objects used by an {@link EmbeddedDruidServer}
* during an embedded cluster test. The returned references should be used for
@@ -95,6 +97,11 @@ public interface ServerReferencesProvider
*/
HttpClient escalatedHttpClient();
+ /**
+ * Non-escalated {@link HttpClient} used by this server to communicate with other Druid servers.
+ */
+ HttpClient globalHttpClient();
+
/**
* {@link ObjectMapper} annotated with {@link Json}.
*/
@@ -105,4 +112,10 @@ public interface ServerReferencesProvider
* The returned object must be used for read-only purposes.
*/
T getInstance(Class clazz);
+
+ /**
+ * Gets the injected instance of the object of the specified type.
+ * The returned object must be used for read-only purposes.
+ */
+ T getInstance(Class clazz, Class annotation);
}
diff --git a/services/src/test/java/org/apache/druid/testing/embedded/indexing/Resources.java b/services/src/test/java/org/apache/druid/testing/embedded/indexing/Resources.java
index 7af3d3a7c7a2..9f6ae4713765 100644
--- a/services/src/test/java/org/apache/druid/testing/embedded/indexing/Resources.java
+++ b/services/src/test/java/org/apache/druid/testing/embedded/indexing/Resources.java
@@ -19,11 +19,36 @@
package org.apache.druid.testing.embedded.indexing;
+import com.google.common.base.Throwables;
+import org.apache.druid.java.util.common.ISE;
+
+import java.io.File;
+import java.net.URL;
+
/**
* Constants and utility methods used in embedded cluster tests.
*/
public class Resources
{
+ /**
+ * Returns the {@link File} for the given local resource.
+ */
+ public static File getFileForResource(String resourceName)
+ {
+ final URL resourceUrl = DataFile.class.getClassLoader().getResource(resourceName);
+ if (resourceUrl == null) {
+ throw new ISE("Could not find resource file[%s]", resourceName);
+ }
+
+ try {
+ return new File(resourceUrl.toURI());
+ }
+ catch (Exception e) {
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }
+
public static class InlineData
{
/**
@@ -67,9 +92,20 @@ public static class InlineData
public static class DataFile
{
- public static final String TINY_WIKI_1_JSON = "data/json/tiny_wiki_1.json";
- public static final String TINY_WIKI_2_JSON = "data/json/tiny_wiki_2.json";
- public static final String TINY_WIKI_3_JSON = "data/json/tiny_wiki_3.json";
+ public static File tinyWiki1Json()
+ {
+ return getFileForResource("data/json/tiny_wiki_1.json");
+ }
+
+ public static File tinyWiki2Json()
+ {
+ return getFileForResource("data/json/tiny_wiki_2.json");
+ }
+
+ public static File tinyWiki3Json()
+ {
+ return getFileForResource("data/json/tiny_wiki_3.json");
+ }
}
/**