Skip to content

Commit a6be0c1

Browse files
committed
do not close provided executor
1 parent 7a15ecb commit a6be0c1

File tree

2 files changed

+39
-7
lines changed

2 files changed

+39
-7
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ public class Client implements AutoCloseable {
145145
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
146146
private final ExecutorService sharedOperationExecutor;
147147

148+
private final boolean isSharedOpExecuterorOwned;
149+
148150
private final Map<String, ClientStatisticsHolder> globalClientStats = new ConcurrentHashMap<>();
149151

150152
private boolean useNewImplementation = false;
@@ -172,8 +174,10 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
172174

173175
boolean isAsyncEnabled = MapUtils.getFlag(this.configuration, ClientConfigProperties.ASYNC_OPERATIONS.getKey(), false);
174176
if (isAsyncEnabled && sharedOperationExecutor == null) {
177+
this.isSharedOpExecuterorOwned = true;
175178
this.sharedOperationExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory("chc-operation"));
176179
} else {
180+
this.isSharedOpExecuterorOwned = false;
177181
this.sharedOperationExecutor = sharedOperationExecutor;
178182
}
179183
this.useNewImplementation = useNewImplementation;
@@ -225,12 +229,16 @@ public String getDefaultDatabase() {
225229
*/
226230
@Override
227231
public void close() {
228-
try {
229-
if (sharedOperationExecutor != null && !sharedOperationExecutor.isShutdown()) {
230-
this.sharedOperationExecutor.shutdownNow();
232+
if (isSharedOpExecuterorOwned) {
233+
try {
234+
if (sharedOperationExecutor != null && !sharedOperationExecutor.isShutdown()) {
235+
this.sharedOperationExecutor.shutdownNow();
236+
}
237+
} catch (Exception e) {
238+
LOG.error("Failed to close shared operation executor", e);
231239
}
232-
} catch (Exception e) {
233-
LOG.error("Failed to close shared operation executor", e);
240+
} else {
241+
LOG.debug("Skip closing operation executor because not owned by client");
234242
}
235243

236244
if (oldClient != null) {
@@ -777,9 +785,10 @@ public Builder useAsyncRequests(boolean async) {
777785
/**
778786
* Sets an executor for running operations. If async operations are enabled and no executor is specified
779787
* client will create a default executor.
780-
*
788+
* Executor will stay running after {@code Client#close() } is called. It is application responsibility to close
789+
* the executor.
781790
* @param executorService - executor service for async operations
782-
* @return
791+
* @return
783792
*/
784793
public Builder setSharedOperationExecutor(ExecutorService executorService) {
785794
this.sharedOperationExecutor = executorService;

client-v2/src/test/java/com/clickhouse/client/ClientTests.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.clickhouse.client.api.ClientException;
66
import com.clickhouse.client.api.enums.Protocol;
77
import com.clickhouse.client.api.query.GenericRecord;
8+
import com.clickhouse.client.api.query.QueryResponse;
89
import com.clickhouse.client.api.query.QuerySettings;
910
import com.clickhouse.client.api.query.Records;
1011
import com.clickhouse.client.config.ClickHouseClientOption;
@@ -19,7 +20,10 @@
1920
import java.util.List;
2021
import java.util.Map;
2122
import java.util.Optional;
23+
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.Executors;
2225
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicBoolean;
2327

2428
public class ClientTests extends BaseIntegrationTest {
2529
private static final Logger LOGGER = LoggerFactory.getLogger(ClientTests.class);
@@ -130,6 +134,25 @@ public void testSetOptions() {
130134
}
131135
}
132136

137+
@Test
138+
public void testProvidedExecutor() throws Exception {
139+
140+
ExecutorService executorService = Executors.newSingleThreadExecutor();
141+
try (Client client = newClient().useAsyncRequests(true).setSharedOperationExecutor(executorService).build()) {
142+
QueryResponse response = client.query("SELECT 1").get();
143+
response.getMetrics();
144+
} catch (Exception e) {
145+
Assert.fail("unexpected exception", e);
146+
}
147+
148+
AtomicBoolean flag = new AtomicBoolean(true);
149+
executorService.submit(() -> flag.compareAndSet(true, false));
150+
executorService.shutdown();
151+
executorService.awaitTermination(10, TimeUnit.SECONDS);
152+
153+
Assert.assertFalse(flag.get());
154+
}
155+
133156
protected Client.Builder newClient() {
134157
ClickHouseNode node = getServer(ClickHouseProtocol.HTTP);
135158
boolean isSecure = isCloud();

0 commit comments

Comments
 (0)