Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,17 @@
import org.apache.hc.client5.http.impl.auth.DigestSchemeFactory;
import org.apache.hc.client5.http.impl.auth.SPNegoSchemeFactory;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
import org.apache.hc.client5.http.routing.RoutingSupport;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.NoHttpResponseException;
import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.config.Registry;
import org.apache.hc.core5.http.config.RegistryBuilder;
import org.apache.hc.core5.http.io.entity.ByteArrayEntity;
import org.apache.hc.core5.http.io.entity.EntityUtils;
Expand Down Expand Up @@ -84,9 +85,9 @@ public class AvaticaCommonsHttpClientImpl implements AvaticaHttpClient, HttpClie
private static AuthScope anyAuthScope = new AuthScope(null, -1);

protected final URI uri;
protected HttpHost httpHost;
protected BasicAuthCache authCache;
protected CloseableHttpClient client;
protected Registry<ConnectionSocketFactory> socketFactoryRegistry;
protected PoolingHttpClientConnectionManager pool;

protected UsernamePasswordCredentials credentials = null;
Expand Down Expand Up @@ -130,6 +131,8 @@ protected void initializeClient(PoolingHttpClientConnectionManager pool,
private RequestConfig createRequestConfig() {
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
requestConfigBuilder
// We cannot avoid this. If the timeout were defined on the pool, then
// it couldn't be overridden later
.setConnectTimeout(this.connectTimeout, TimeUnit.MILLISECONDS)
.setResponseTimeout(this.responseTimeout, TimeUnit.MILLISECONDS);
List<String> preferredSchemes = new ArrayList<>();
Expand All @@ -153,12 +156,19 @@ private RequestConfig createRequestConfig() {
@Override public byte[] send(byte[] request) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd prefer an approach like this for adapting to the new ensureOpen(...) syntax since it avoids the need for any pass-by-instance semantics:

@Override public byte[] send(byte[] request) {
  // Create the client with the AuthSchemeRegistry and manager
  HttpPost post = new HttpPost(uri);
  try {
    HttpHost host = RoutingSupport.determineHost(post);

    while (true) {
      ByteArrayEntity entity = new ByteArrayEntity(request, ContentType.APPLICATION_OCTET_STREAM);

      post.setEntity(entity);

      try (ClassicHttpResponse response = executeOpen(host, post, context)) {
        final int statusCode = response.getCode();
        if (HttpURLConnection.HTTP_OK == statusCode
            || HttpURLConnection.HTTP_INTERNAL_ERROR == statusCode) {
          userToken = context.getUserToken();
          return EntityUtils.toByteArray(response.getEntity());
        } else if (HttpURLConnection.HTTP_UNAVAILABLE == statusCode) {
          LOG.debug("Failed to connect to server (HTTP/503), retrying");
          continue;
        }

        throw new RuntimeException(
            "Failed to execute HTTP Request, got HTTP/" + statusCode);
      } catch (NoHttpResponseException e) {
        // This can happen when sitting behind a load balancer and a backend server dies
        LOG.debug("The server failed to issue an HTTP response, retrying");
        continue;
      }
    }
  } catch (RuntimeException e) {
    throw e;
  } catch (Exception e) {
    LOG.debug("Failed to execute HTTP request", e);
    throw new RuntimeException(e);
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate @chrisdennis ?

I couldn't find ensureOpen() in either the apache HttpClient or Avatica codebase.
Google found it in some nio implementation classes, but I don't see how that's related.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you mean using the execute() calls that pass a Handler, those don't return error codes, and would make implementing the retry logic unneccarily convoluted.

The response is already in a try-with-resources block, so there is no danger of leakage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry - I've no idea why I wrote ensureOpen... you're right I mean't executeOpen(...). I'm not suggesting migrating to execute(...) calls though. I meant that if you formulate things like above then you don't need to compute the HttpHost in advance using a "dummy" HttpPost/URI, and you don't need to pass the result in via the new instance variable, you can just reorder things slightly, and keep everything contained within the call stack.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification.

Yes, that would work, but we're currently caching and pre-computing as much as possible.
Your version is slightly less efficient, as it needs to call RoutingSupport.determineHost for each send() call.

But we can avoid the Dummy post by postponing the host detection, I will do that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the dummy post, please re-check @chrisdennis .

while (true) {
ByteArrayEntity entity = new ByteArrayEntity(request, ContentType.APPLICATION_OCTET_STREAM);

// Create the client with the AuthSchemeRegistry and manager
HttpPost post = new HttpPost(uri);
post.setEntity(entity);

try (CloseableHttpResponse response = execute(post, context)) {
if (httpHost == null) {
try {
httpHost = RoutingSupport.determineHost(post);
} catch (HttpException e) {
LOG.debug("Failed to execute HTTP request", e);
throw new RuntimeException("Could not determine Http Host from URI", e);
}
}

try (ClassicHttpResponse response = executeOpen(httpHost, post, context)) {
final int statusCode = response.getCode();
if (HttpURLConnection.HTTP_OK == statusCode
|| HttpURLConnection.HTTP_INTERNAL_ERROR == statusCode) {
Expand All @@ -185,9 +195,9 @@ private RequestConfig createRequestConfig() {
}

// Visible for testing
CloseableHttpResponse execute(HttpPost post, HttpClientContext context)
ClassicHttpResponse executeOpen(HttpHost httpHost, HttpPost post, HttpClientContext context)
throws IOException, ClientProtocolException {
return client.execute(post, context);
return client.executeOpen(httpHost, post, context);
}

@Override public void setUsernamePassword(AuthenticationType authType, String username,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@
import org.apache.calcite.avatica.remote.HostnameVerificationConfigurable.HostnameVerification;

import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
import org.apache.hc.client5.http.ssl.HttpsSupport;
import org.apache.hc.client5.http.ssl.NoopHostnameVerifier;
import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
import org.apache.hc.core5.http.config.Registry;
import org.apache.hc.core5.http.config.RegistryBuilder;
import org.apache.hc.client5.http.ssl.TlsSocketStrategy;
import org.apache.hc.core5.ssl.SSLContextBuilder;
import org.apache.hc.core5.ssl.SSLContexts;

Expand Down Expand Up @@ -71,36 +69,24 @@ public static PoolingHttpClientConnectionManager getPool(ConnectionConfig config
}

private static PoolingHttpClientConnectionManager setupPool(ConnectionConfig config) {
Registry<ConnectionSocketFactory> csfr = createCSFRegistry(config);
PoolingHttpClientConnectionManager pool = new PoolingHttpClientConnectionManager(csfr);
final String maxCnxns =
System.getProperty(MAX_POOLED_CONNECTIONS_KEY, MAX_POOLED_CONNECTIONS_DEFAULT);
pool.setMaxTotal(Integer.parseInt(maxCnxns));
// Increase default max connection per route to 25
final String maxCnxns = System.getProperty(MAX_POOLED_CONNECTIONS_KEY,
MAX_POOLED_CONNECTIONS_DEFAULT);
final String maxCnxnsPerRoute = System.getProperty(MAX_POOLED_CONNECTION_PER_ROUTE_KEY,
MAX_POOLED_CONNECTION_PER_ROUTE_DEFAULT);
pool.setDefaultMaxPerRoute(Integer.parseInt(maxCnxnsPerRoute));
PoolingHttpClientConnectionManager pool = PoolingHttpClientConnectionManagerBuilder.create()
.setTlsSocketStrategy(createTlsSocketStrategy(config))
.setMaxConnTotal(Integer.parseInt(maxCnxns))
.setMaxConnPerRoute(Integer.parseInt(maxCnxnsPerRoute)).build();
LOG.debug("Created new pool {}", pool);
return pool;
}

private static Registry<ConnectionSocketFactory> createCSFRegistry(ConnectionConfig config) {
RegistryBuilder<ConnectionSocketFactory> registryBuilder = RegistryBuilder.create();
configureHttpRegistry(registryBuilder);
configureHttpsRegistry(registryBuilder, config);

return registryBuilder.build();
}

private static void configureHttpsRegistry(
RegistryBuilder<ConnectionSocketFactory> registryBuilder, ConnectionConfig config) {
private static TlsSocketStrategy createTlsSocketStrategy(ConnectionConfig config) {
try {
SSLContext sslContext = getSSLContext(config);
final HostnameVerifier verifier = getHostnameVerifier(config.hostnameVerification());
SSLConnectionSocketFactory sslFactory = new SSLConnectionSocketFactory(sslContext, verifier);
registryBuilder.register("https", sslFactory);
return new DefaultClientTlsStrategy(getSSLContext(config),
getHostnameVerifier(config.hostnameVerification()));
} catch (Exception e) {
LOG.error("HTTPS registry configuration failed");
LOG.error("HTTPS TlsSocketStrategy configuration failed");
throw new RuntimeException(e);
}
}
Expand Down Expand Up @@ -134,11 +120,6 @@ private static void loadTrustStore(SSLContextBuilder sslContextBuilder, Connecti
LOG.info("Trustore loaded from: {}", config.truststore());
}

private static void configureHttpRegistry(
RegistryBuilder<ConnectionSocketFactory> registryBuilder) {
registryBuilder.register("http", PlainConnectionSocketFactory.getSocketFactory());
}

/**
* Creates the {@code HostnameVerifier} given the provided {@code verification}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.NoHttpResponseException;
import org.apache.hc.core5.http.io.entity.ByteArrayEntity;
import org.apache.hc.core5.http.io.entity.StringEntity;
Expand Down Expand Up @@ -76,7 +77,7 @@ public class AvaticaCommonsHttpClientImplTest {
ConnectionConfig.class));

doAnswer(failThenSucceed).when(client)
.execute(any(HttpPost.class), eq(client.context));
.executeOpen(any(HttpHost.class), any(HttpPost.class), eq(client.context));

when(badResponse.getCode()).thenReturn(HttpURLConnection.HTTP_UNAVAILABLE);

Expand Down Expand Up @@ -110,7 +111,7 @@ public class AvaticaCommonsHttpClientImplTest {
ConnectionConfig.class));

doAnswer(failThenSucceed).when(client)
.execute(any(HttpPost.class), eq(client.context));
.executeOpen(any(HttpHost.class), any(HttpPost.class), eq(client.context));

when(badResponse.getCode()).thenReturn(HttpURLConnection.HTTP_UNAVAILABLE);

Expand All @@ -136,13 +137,13 @@ public void testPersistentContextReusedAcrossRequests() throws Exception {
when(response.getEntity()).thenReturn(entity);

doReturn(response).when(client)
.execute(any(HttpPost.class), eq(client.context));
.executeOpen(any(HttpHost.class), any(HttpPost.class), eq(client.context));

client.send(new byte[0]);
client.send(new byte[0]);

// Verify that the persistent context was reused and not created again
verify(client, times(2)).execute(any(HttpPost.class),
verify(client, times(2)).executeOpen(any(HttpHost.class), any(HttpPost.class),
eq(client.context));
}

Expand All @@ -154,7 +155,7 @@ public void testPersistentContextThreadSafety() throws Exception {
ConnectionConfig.class));

doReturn(mock(CloseableHttpResponse.class)).when(client)
.execute(any(HttpPost.class), eq(client.context));
.executeOpen(any(HttpHost.class), any(HttpPost.class), eq(client.context));

Runnable requestTask = () -> {
try {
Expand All @@ -175,7 +176,8 @@ public void testPersistentContextThreadSafety() throws Exception {
thread.join();
}

verify(client, times(threadCount)).execute(any(HttpPost.class), eq(client.context));
verify(client, times(threadCount)).executeOpen(any(HttpHost.class), any(HttpPost.class),
eq(client.context));
}

}
Expand Down