Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.clickhouse.client.api.internal.CommonSettings;
import org.apache.hc.core5.http.HttpHeaders;

import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Map;

Expand All @@ -27,6 +28,11 @@ public InsertSettings(Map<String, Object> settings) {
}
}

private InsertSettings(CommonSettings settings) {
this.settings = settings;
setDefaults();
}

private void setDefaults() {// Default settings, for now a very small list
this.setInputStreamCopyBufferSize(DEFAULT_INPUT_STREAM_BATCH_SIZE);
}
Expand Down Expand Up @@ -274,4 +280,28 @@ public InsertSettings logComment(String logComment) {
public String getLogComment() {
return settings.getLogComment();
}

public static InsertSettings merge(InsertSettings source, InsertSettings override) {
CommonSettings mergedSettings = source.settings.copyAndMerge(override.settings);
InsertSettings insertSettings = new InsertSettings(mergedSettings);
insertSettings.setInputStreamCopyBufferSize(override.getInputStreamCopyBufferSize());
return insertSettings;
}

/**
* Sets a network operation timeout.
* @param timeout
* @param unit
*/
public void setNetworkTimeout(long timeout, ChronoUnit unit) {
settings.setNetworkTimeout(timeout, unit);
}

/**
* Returns network timeout. Zero value is returned if no timeout is set.
* @return timeout in ms.
*/
public Long getNetworkTimeout() {
return settings.getNetworkTimeout();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.clickhouse.client.api.Client;
import com.clickhouse.client.api.ClientConfigProperties;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -217,6 +219,25 @@ public String getLogComment() {
return logComment;
}

/**
* Sets a network operation timeout.
* @param timeout
* @param unit
*/
public void setNetworkTimeout(long timeout, ChronoUnit unit) {
settings.put(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey(), Duration.of(timeout, unit).toMillis());
}

/**
* Returns network timeout. Zero value is returned if no timeout is set.
* @return timeout in ms.
*/
public Long getNetworkTimeout() {
return (Long) getOption(ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getKey(),
ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getDefaultValue());
}


public CommonSettings copyAndMerge(CommonSettings override) {
CommonSettings copy = new CommonSettings();
copy.settings.putAll(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ public class HttpAPIClientHelper {

private final CloseableHttpClient httpClient;

private final RequestConfig baseRequestConfig;

private String proxyAuthHeaderValue;

private final Set<ClientFaultCause> defaultRetryCauses;
Expand All @@ -125,11 +123,6 @@ public HttpAPIClientHelper(Map<String, Object> configuration, Object metricsRegi
this.metricsRegistry = metricsRegistry;
this.httpClient = createHttpClient(initSslContext, configuration);

RequestConfig.Builder reqConfBuilder = RequestConfig.custom();
reqConfBuilder.setConnectionRequestTimeout(ClientConfigProperties.CONNECTION_REQUEST_TIMEOUT.getOrDefault(configuration), TimeUnit.MILLISECONDS);

this.baseRequestConfig = reqConfBuilder.build();

boolean usingClientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(configuration);
boolean usingServerCompression = ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getOrDefault(configuration);
boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(configuration);
Expand Down Expand Up @@ -438,12 +431,19 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map<String, Object> r
boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig);
boolean appCompressedData = ClientConfigProperties.APP_COMPRESSED_DATA.getOrDefault(requestConfig);

req.setConfig(baseRequestConfig);

// setting entity. wrapping if compression is enabled
req.setEntity(wrapRequestEntity(new EntityTemplate(-1, CONTENT_TYPE, null, writeCallback),
clientCompression, useHttpCompression, appCompressedData, lz4Factory, requestConfig));

HttpClientContext context = HttpClientContext.create();
Number responseTimeout = ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getOrDefault(requestConfig);
Number connectionReqTimeout = ClientConfigProperties.CONNECTION_REQUEST_TIMEOUT.getOrDefault(requestConfig);
RequestConfig reqHttpConf = RequestConfig.custom()
.setResponseTimeout(responseTimeout.longValue(), TimeUnit.MILLISECONDS)
.setConnectionRequestTimeout(connectionReqTimeout.longValue(), TimeUnit.MILLISECONDS)
.build();
context.setRequestConfig(reqHttpConf);

ClassicHttpResponse httpResponse = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import com.clickhouse.client.api.internal.ValidationUtils;
import com.clickhouse.data.ClickHouseFormat;

import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;

Expand Down Expand Up @@ -269,6 +269,23 @@ public String getLogComment() {
return settings.getLogComment();
}

/**
* Sets a network operation timeout.
* @param timeout
* @param unit
*/
public void setNetworkTimeout(long timeout, ChronoUnit unit) {
settings.setNetworkTimeout(timeout, unit);
}

/**
* Returns network timeout. Zero value is returned if no timeout is set.
* @return timeout in ms.
*/
public Long getNetworkTimeout() {
return settings.getNetworkTimeout();
}

public static QuerySettings merge(QuerySettings source, QuerySettings override) {
CommonSettings mergedSettings = source.settings.copyAndMerge(override.settings);
return new QuerySettings(mergedSettings);
Expand Down
41 changes: 34 additions & 7 deletions client-v2/src/test/java/com/clickhouse/client/SettingsTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import org.testng.Assert;
import org.testng.annotations.Test;

import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

@Test(groups = {"unit"})
public class SettingsTests {
Expand All @@ -22,15 +24,28 @@ void testClientSettings() {
}

@Test
void testMergeQuerySettings() {
QuerySettings settings1 = new QuerySettings().setQueryId("test1").httpHeader("key1", "value1");
QuerySettings settings2 = new QuerySettings().httpHeader("key1", "value2");
void testMergeSettings() {
{
QuerySettings settings1 = new QuerySettings().setQueryId("test1").httpHeader("key1", "value1");
QuerySettings settings2 = new QuerySettings().httpHeader("key1", "value2");

QuerySettings merged = QuerySettings.merge(settings1, settings2);
Assert.assertNotSame(merged, settings1);
Assert.assertNotSame(merged, settings2);

QuerySettings merged = QuerySettings.merge(settings1, settings2);
Assert.assertNotSame(merged, settings1);
Assert.assertNotSame(merged, settings2);
Assert.assertEquals(merged.getAllSettings().get(ClientConfigProperties.httpHeader("key1")), "value2");
}
{
InsertSettings settings1 = new InsertSettings().setQueryId("test1").httpHeader("key1", "value1");
InsertSettings settings2 = new InsertSettings().httpHeader("key1", "value2").setInputStreamCopyBufferSize(200000);

Assert.assertEquals(merged.getAllSettings().get(ClientConfigProperties.httpHeader("key1")), "value2");
InsertSettings merged = InsertSettings.merge(settings1, settings2);
Assert.assertNotSame(merged, settings1);
Assert.assertNotSame(merged, settings2);

Assert.assertEquals(merged.getInputStreamCopyBufferSize(), settings2.getInputStreamCopyBufferSize());
Assert.assertEquals(merged.getAllSettings().get(ClientConfigProperties.httpHeader("key1")), "value2");
}
}

@Test
Expand Down Expand Up @@ -87,6 +102,12 @@ void testQuerySettingsSpecific() throws Exception {
settings.logComment(null);
Assert.assertNull(settings.getLogComment());
}

{
final QuerySettings settings = new QuerySettings();
settings.setNetworkTimeout(10, ChronoUnit.SECONDS);
Assert.assertEquals(settings.getNetworkTimeout(), TimeUnit.SECONDS.toMillis(10));
}
}

@Test
Expand Down Expand Up @@ -140,5 +161,11 @@ public void testInsertSettingsSpecific() throws Exception {
settings.logComment(null);
Assert.assertNull(settings.getLogComment());
}

{
final InsertSettings settings = new InsertSettings();
settings.setNetworkTimeout(10, ChronoUnit.SECONDS);
Assert.assertEquals(settings.getNetworkTimeout(), TimeUnit.SECONDS.toMillis(10));
}
}
}
67 changes: 50 additions & 17 deletions jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.sql.Statement;
import java.sql.Struct;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
Expand All @@ -46,7 +47,7 @@
import java.util.stream.Collectors;

public class ConnectionImpl implements Connection, JdbcV2Wrapper {
private static final Logger log = LoggerFactory.getLogger(ConnectionImpl.class);
private static final Logger LOG = LoggerFactory.getLogger(ConnectionImpl.class);

protected final String url;
private final Client client; // this member is private to force using getClient()
Expand All @@ -65,6 +66,8 @@ public class ConnectionImpl implements Connection, JdbcV2Wrapper {

private final SqlParser sqlParser;

private Executor networkTimeoutExecutor;

public ConnectionImpl(String url, Properties info) throws SQLException {
try {
this.url = url;//Raw URL
Expand All @@ -85,10 +88,10 @@ public ConnectionImpl(String url, Properties info) throws SQLException {
}

if (this.config.isDisableFrameworkDetection()) {
log.debug("Framework detection is disabled.");
LOG.debug("Framework detection is disabled.");
} else {
String detectedFrameworks = Driver.FrameworksDetection.getFrameworksDetected();
log.debug("Detected frameworks: {}", detectedFrameworks);
LOG.debug("Detected frameworks: {}", detectedFrameworks);
if (!detectedFrameworks.trim().isEmpty()) {
clientName += " (" + detectedFrameworks + ")";
}
Expand Down Expand Up @@ -213,9 +216,8 @@ public void close() throws SQLException {
if (isClosed()) {
return;
}

client.close();
closed = true;
closed = true; // mark as closed to prevent further invocations
client.close(); // this will disrupt pending requests.
}

@Override
Expand Down Expand Up @@ -600,27 +602,58 @@ public String getSchema() throws SQLException {

@Override
public void abort(Executor executor) throws SQLException {
if (!config.isIgnoreUnsupportedRequests()) {
throw new SQLFeatureNotSupportedException("abort not supported", ExceptionUtils.SQL_STATE_FEATURE_NOT_SUPPORTED);
if (executor == null) {
throw new SQLException("Executor must be not null");
}
// This method should check permissions with SecurityManager but the one is deprecated.
// There is no replacement for SecurityManger and it is marked for removal.
this.close();
}

@Override
public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
//TODO: Should this be supported?
if (!config.isIgnoreUnsupportedRequests()) {
throw new SQLFeatureNotSupportedException("setNetworkTimeout not supported", ExceptionUtils.SQL_STATE_FEATURE_NOT_SUPPORTED);
ensureOpen();

// Very good mail thread about this method implementation. https://mail.openjdk.org/pipermail/jdbc-spec-discuss/2017-November/000236.html

// This method should check permissions with SecurityManager but the one is deprecated.
// There is no replacement for SecurityManger and it is marked for removal.
if (milliseconds > 0 && executor == null) {
// we need executor only for positive timeout values.
throw new SQLException("Executor must be not null");
}
if (milliseconds < 0) {
throw new SQLException("Timeout must be >= 0");
}

// How it should work:
// if timeout is set with this method then any timeout exception should be reported to the connection
// when connection get signal about timeout it uses executor to abort itself
// Some connection pools set timeout before calling Connection#close() to ensure that this operation will not hang
// Socket timeout is propagated with QuerySettings this connection has.
networkTimeoutExecutor = executor;
defaultQuerySettings.setNetworkTimeout(milliseconds, ChronoUnit.MILLIS);
}


// Should be called by child object to notify about timeout.
public synchronized void onNetworkTimeout() {
if (this.closed || networkTimeoutExecutor == null) {
return; // we closed already or have not set network timeout so do nothing.
}

networkTimeoutExecutor.execute(() -> {
try {
this.abort(networkTimeoutExecutor);
} catch (SQLException e) {
throw new RuntimeException("Failed to abort connection", e);
}
});
}

@Override
public int getNetworkTimeout() throws SQLException {
//TODO: Should this be supported?
if (!config.isIgnoreUnsupportedRequests()) {
throw new SQLFeatureNotSupportedException("getNetworkTimeout not supported", ExceptionUtils.SQL_STATE_FEATURE_NOT_SUPPORTED);
}

return -1;
return defaultQuerySettings.getNetworkTimeout().intValue();
}

/**
Expand Down
Loading
Loading