Skip to content

Commit 68aaba8

Browse files
[PECO-2009] Improvements to thrift-server client (#552)
apache http client maintains a pool of connections to mitigate TCP-handshake for every request. however, server can unilaterally decide to close the connection to avoid resource drainage. a problem with classical network IO model (used by apache client 4.x) is that the socket only reacts to incoming events when it is blocked on an operation. the client, therefore can miss the event of server closing the connection. this creates a need to remove expired/idle connections from the pool on client side. there are broadly two ways of doing so: close such connections manually in different code paths (currently being done using the method) create a background thread. from apache http client 4.5 onwards, they have included [IdleConnectionEvictor](https://www.javadoc.io/doc/org.apache.httpcomponents/httpclient/4.5.5/org/apache/http/impl/client/IdleConnectionEvictor.html) in library so that users don't have to write code for the background thread. we can use this to avoid manually calling the method summarising the improvements: - use byte array input stream for receiving data structure - use IdleConnectionEvictor to close expired/idle connections
1 parent 385cb64 commit 68aaba8

File tree

8 files changed

+167
-334
lines changed

8 files changed

+167
-334
lines changed

src/main/java/com/databricks/jdbc/api/impl/arrow/RemoteChunkProvider.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,6 @@ public ArrowResultChunk getChunk() throws DatabricksSQLException {
131131
return null;
132132
}
133133
ArrowResultChunk chunk = chunkIndexToChunksMap.get(currentChunkIndex);
134-
httpClient.closeExpiredAndIdleConnections();
135134
synchronized (chunk) {
136135
try {
137136
while (!isDownloadComplete(chunk.getStatus())) {
@@ -188,7 +187,6 @@ public void close() {
188187
this.isClosed = true;
189188
this.chunkDownloaderExecutorService.shutdownNow();
190189
this.chunkIndexToChunksMap.values().forEach(ArrowResultChunk::releaseChunk);
191-
httpClient.closeExpiredAndIdleConnections();
192190
}
193191

194192
/** Release the memory for previous chunk since it is already consumed */

src/main/java/com/databricks/jdbc/dbclient/IDatabricksHttpClient.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,4 @@ public interface IDatabricksHttpClient {
1414
* @return http response
1515
*/
1616
CloseableHttpResponse execute(HttpUriRequest request) throws DatabricksHttpException;
17-
18-
void closeExpiredAndIdleConnections();
1917
}

src/main/java/com/databricks/jdbc/dbclient/impl/http/DatabricksHttpClient.java

Lines changed: 8 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.http.conn.ssl.NoopHostnameVerifier;
3434
import org.apache.http.impl.client.CloseableHttpClient;
3535
import org.apache.http.impl.client.HttpClientBuilder;
36+
import org.apache.http.impl.client.IdleConnectionEvictor;
3637
import org.apache.http.impl.conn.DefaultSchemePortResolver;
3738
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
3839
import org.apache.http.ssl.SSLContextBuilder;
@@ -52,15 +53,17 @@ public class DatabricksHttpClient implements IDatabricksHttpClient {
5253
private static PoolingHttpClientConnectionManager connectionManager;
5354
private final CloseableHttpClient httpClient;
5455
protected static int idleHttpConnectionExpiry;
55-
private CloseableHttpClient httpDisabledSSLClient;
5656
private DatabricksHttpRetryHandler retryHandler;
57+
private IdleConnectionEvictor idleConnectionEvictor;
5758

5859
private DatabricksHttpClient(IDatabricksConnectionContext connectionContext) {
5960
initializeConnectionManager(connectionContext);
6061
httpClient = makeClosableHttpClient(connectionContext);
61-
httpDisabledSSLClient = makeClosableDisabledSslHttpClient();
6262
idleHttpConnectionExpiry = connectionContext.getIdleHttpConnectionExpiry();
6363
retryHandler = new DatabricksHttpRetryHandler(connectionContext);
64+
idleConnectionEvictor =
65+
new IdleConnectionEvictor(connectionManager, idleHttpConnectionExpiry, TimeUnit.SECONDS);
66+
idleConnectionEvictor.start();
6467
}
6568

6669
@VisibleForTesting
@@ -219,30 +222,6 @@ public CloseableHttpResponse execute(HttpUriRequest request) throws DatabricksHt
219222
return null;
220223
}
221224

222-
public CloseableHttpResponse executeWithoutCertVerification(HttpUriRequest request)
223-
throws DatabricksHttpException {
224-
LOGGER.debug(
225-
String.format("Executing HTTP request [{%s}]", RequestSanitizer.sanitizeRequest(request)));
226-
try {
227-
return httpDisabledSSLClient.execute(request);
228-
} catch (Exception e) {
229-
throwHttpException(e, request, LogLevel.DEBUG);
230-
}
231-
return null;
232-
}
233-
234-
@Override
235-
public void closeExpiredAndIdleConnections() {
236-
if (connectionManager != null) {
237-
synchronized (connectionManager) {
238-
LOGGER.debug(
239-
String.format("connection pool stats: {%s}", connectionManager.getTotalStats()));
240-
connectionManager.closeExpiredConnections();
241-
connectionManager.closeIdleConnections(idleHttpConnectionExpiry, TimeUnit.SECONDS);
242-
}
243-
}
244-
}
245-
246225
static String getUserAgent() {
247226
String sdkUserAgent = UserAgent.asString();
248227
// Split the string into parts
@@ -270,6 +249,9 @@ public static synchronized void removeInstance(IDatabricksConnectionContext cont
270249
DatabricksHttpClient instance = instances.remove(contextKey);
271250
if (instance != null) {
272251
try {
252+
if (instance.idleConnectionEvictor != null) {
253+
instance.idleConnectionEvictor.shutdown();
254+
}
273255
instance.httpClient.close();
274256
} catch (IOException e) {
275257
LOGGER.debug(String.format("Caught error while closing http client. Error %s", e));

src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksHttpTTransport.java

Lines changed: 57 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,17 @@
66
import com.databricks.jdbc.log.JdbcLogger;
77
import com.databricks.jdbc.log.JdbcLoggerFactory;
88
import com.google.common.annotations.VisibleForTesting;
9+
import java.io.ByteArrayInputStream;
910
import java.io.ByteArrayOutputStream;
1011
import java.io.IOException;
11-
import java.io.InputStream;
1212
import java.util.Collections;
1313
import java.util.HashMap;
1414
import java.util.Map;
15+
import org.apache.http.HttpEntity;
1516
import org.apache.http.client.methods.CloseableHttpResponse;
1617
import org.apache.http.client.methods.HttpPost;
1718
import org.apache.http.entity.ByteArrayEntity;
19+
import org.apache.http.util.EntityUtils;
1820
import org.apache.thrift.TConfiguration;
1921
import org.apache.thrift.transport.TTransport;
2022
import org.apache.thrift.transport.TTransportException;
@@ -23,20 +25,19 @@ public class DatabricksHttpTTransport extends TTransport {
2325

2426
private static final JdbcLogger LOGGER =
2527
JdbcLoggerFactory.getLogger(DatabricksHttpTTransport.class);
28+
private static final Map<String, String> DEFAULT_HEADERS =
29+
Collections.unmodifiableMap(getDefaultHeaders());
2630
private final DatabricksHttpClient httpClient;
2731
private final String url;
2832
private Map<String, String> customHeaders = Collections.emptyMap();
2933
private final ByteArrayOutputStream requestBuffer;
30-
private InputStream inputStream = null;
31-
private CloseableHttpResponse response = null;
32-
private static final Map<String, String> DEFAULT_HEADERS =
33-
Collections.unmodifiableMap(getDefaultHeaders());
34+
private ByteArrayInputStream responseBuffer;
3435

3536
public DatabricksHttpTTransport(DatabricksHttpClient httpClient, String url) {
3637
this.httpClient = httpClient;
37-
this.httpClient.closeExpiredAndIdleConnections();
3838
this.url = url;
3939
this.requestBuffer = new ByteArrayOutputStream();
40+
this.responseBuffer = null;
4041
}
4142

4243
@Override
@@ -51,65 +52,20 @@ public void open() throws TTransportException {
5152
}
5253

5354
@Override
54-
public void close() {
55-
this.httpClient.closeExpiredAndIdleConnections();
56-
if (inputStream != null) {
57-
try {
58-
inputStream.close();
59-
} catch (IOException e) {
60-
LOGGER.error(
61-
String.format("Failed to close inputStream with error {%s}. Skipping the close.", e));
62-
}
63-
inputStream = null;
64-
}
65-
if (response != null) {
66-
try {
67-
response.close();
68-
} catch (IOException e) {
69-
LOGGER.error(String.format("Failed to close response with error {%s}", e.toString()));
70-
}
71-
response = null;
72-
}
73-
}
74-
75-
public void setCustomHeaders(Map<String, String> headers) {
76-
if (headers != null) {
77-
customHeaders = new HashMap<>(headers);
78-
} else {
79-
customHeaders = Collections.emptyMap();
80-
}
81-
}
82-
83-
@VisibleForTesting
84-
Map<String, String> getCustomHeaders() {
85-
return customHeaders;
86-
}
87-
88-
@VisibleForTesting
89-
InputStream getInputStream() {
90-
return inputStream;
91-
}
92-
93-
@VisibleForTesting
94-
void setInputStream(InputStream inputStream) {
95-
this.inputStream = inputStream;
96-
}
55+
public void close() {}
9756

9857
@Override
9958
public int read(byte[] buf, int off, int len) throws TTransportException {
100-
if (inputStream == null) {
101-
throw new TTransportException("Response buffer is empty, no request.");
59+
if (responseBuffer == null) {
60+
LOGGER.error("Response buffer is empty, no response.");
61+
throw new TTransportException("Response buffer is empty, no response.");
10262
}
103-
try {
104-
int ret = inputStream.read(buf, off, len);
105-
if (ret == -1) {
106-
throw new TTransportException("No more data available.");
107-
}
108-
return ret;
109-
} catch (IOException e) {
110-
LOGGER.error(String.format("Failed to read inputStream with error {%s}", e.toString()));
111-
throw new TTransportException(e);
63+
int numBytes = responseBuffer.read(buf, off, len);
64+
if (numBytes == -1) {
65+
LOGGER.error("No data available to read.");
66+
throw new TTransportException("No more data available.");
11267
}
68+
return numBytes;
11369
}
11470

11571
@Override
@@ -119,32 +75,34 @@ public void write(byte[] buf, int off, int len) {
11975

12076
@Override
12177
public void flush() throws TTransportException {
122-
try {
123-
HttpPost request = new HttpPost(this.url);
124-
DEFAULT_HEADERS.forEach(request::addHeader);
125-
if (customHeaders != null) {
126-
customHeaders.forEach(request::addHeader);
127-
}
128-
request.setEntity(new ByteArrayEntity(requestBuffer.toByteArray()));
129-
response = httpClient.execute(request);
78+
HttpPost request = new HttpPost(this.url);
79+
DEFAULT_HEADERS.forEach(request::addHeader);
80+
81+
if (customHeaders != null) {
82+
customHeaders.forEach(request::addHeader);
83+
}
84+
85+
// Set the request entity
86+
request.setEntity(new ByteArrayEntity(requestBuffer.toByteArray()));
87+
88+
// Execute the request and handle the response
89+
try (CloseableHttpResponse response = httpClient.execute(request)) {
13090
ValidationUtil.checkHTTPError(response);
131-
inputStream = response.getEntity().getContent();
132-
requestBuffer.reset();
133-
} catch (DatabricksHttpException | IOException e) {
134-
Throwable cause = e;
135-
while (cause != null) {
136-
if (cause instanceof DatabricksHttpException) {
137-
throw new TTransportException(
138-
TTransportException.UNKNOWN, "Failed to flush data to server: " + cause.getMessage());
139-
}
140-
cause = cause.getCause();
141-
}
142-
httpClient.closeExpiredAndIdleConnections();
14391

92+
// Read the response
93+
HttpEntity entity = response.getEntity();
94+
if (entity != null) {
95+
byte[] responseBytes = EntityUtils.toByteArray(entity);
96+
responseBuffer = new ByteArrayInputStream(responseBytes);
97+
}
98+
} catch (DatabricksHttpException | IOException e) {
14499
String errorMessage = "Failed to flush data to server: " + e.getMessage();
145-
LOGGER.error(errorMessage);
100+
LOGGER.error(e, errorMessage);
146101
throw new TTransportException(TTransportException.UNKNOWN, errorMessage);
147102
}
103+
104+
// Reset the request buffer
105+
requestBuffer.reset();
148106
}
149107

150108
@Override
@@ -158,10 +116,28 @@ public void updateKnownMessageSize(long size) throws TTransportException {}
158116
@Override
159117
public void checkReadBytesAvailable(long numBytes) throws TTransportException {}
160118

119+
public void setCustomHeaders(Map<String, String> headers) {
120+
if (headers != null) {
121+
customHeaders = new HashMap<>(headers);
122+
} else {
123+
customHeaders = Collections.emptyMap();
124+
}
125+
}
126+
161127
private static Map<String, String> getDefaultHeaders() {
162128
Map<String, String> headers = new HashMap<>();
163129
headers.put("Content-Type", "application/x-thrift");
164130
headers.put("Accept", "application/x-thrift");
165131
return headers;
166132
}
133+
134+
@VisibleForTesting
135+
Map<String, String> getCustomHeaders() {
136+
return customHeaders;
137+
}
138+
139+
@VisibleForTesting
140+
void setResponseBuffer(ByteArrayInputStream responseBuffer) {
141+
this.responseBuffer = responseBuffer;
142+
}
167143
}

0 commit comments

Comments
 (0)