Skip to content

Commit 175976b

Browse files
authored
Merge pull request #2356 from ClickHouse/fix_npe_no_scheduler
[client-v2] fixed NPE and added test
2 parents a5e9c34 + bf857df commit 175976b

File tree

5 files changed

+107
-56
lines changed

5 files changed

+107
-56
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import com.clickhouse.client.config.ClickHouseClientOption;
4040
import com.clickhouse.data.ClickHouseColumn;
4141
import com.clickhouse.data.ClickHouseFormat;
42-
import net.jpountz.lz4.LZ4Compressor;
4342
import net.jpountz.lz4.LZ4Factory;
4443
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
4544
import org.apache.hc.core5.http.ClassicHttpResponse;
@@ -185,31 +184,25 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
185184
} else {
186185
this.lz4Factory = LZ4Factory.fastestJavaInstance();
187186
}
187+
188+
this.serverVersion = configuration.getOrDefault(ClientConfigProperties.SERVER_VERSION.getKey(), "unknown");
188189
}
189190

190191
/**
191192
* Loads essential information about a server. Should be called after client creation.
192193
*
193194
*/
194195
public void loadServerInfo() {
195-
// only if 2 properties are set disable retrieval from server
196-
if (!this.configuration.containsKey(ClientConfigProperties.SERVER_TIMEZONE.getKey()) && !this.configuration.containsKey(ClientConfigProperties.SERVER_VERSION.getKey())) {
197-
try (QueryResponse response = this.query("SELECT currentUser() AS user, timezone() AS timezone, version() AS version LIMIT 1").get()) {
198-
try (ClickHouseBinaryFormatReader reader = this.newBinaryFormatReader(response)) {
199-
if (reader.next() != null) {
200-
this.configuration.put(ClientConfigProperties.USER.getKey(), reader.getString("user"));
201-
this.configuration.put(ClientConfigProperties.SERVER_TIMEZONE.getKey(), reader.getString("timezone"));
202-
serverVersion = reader.getString("version");
203-
}
196+
try (QueryResponse response = this.query("SELECT currentUser() AS user, timezone() AS timezone, version() AS version LIMIT 1").get()) {
197+
try (ClickHouseBinaryFormatReader reader = this.newBinaryFormatReader(response)) {
198+
if (reader.next() != null) {
199+
this.configuration.put(ClientConfigProperties.USER.getKey(), reader.getString("user"));
200+
this.configuration.put(ClientConfigProperties.SERVER_TIMEZONE.getKey(), reader.getString("timezone"));
201+
serverVersion = reader.getString("version");
204202
}
205-
} catch (Exception e) {
206-
throw new ClientException("Failed to get server info", e);
207-
}
208-
} else {
209-
LOG.info("Using server version " + this.configuration.get(ClientConfigProperties.SERVER_VERSION.getKey()) + " and timezone " + this.configuration.get(ClientConfigProperties.SERVER_TIMEZONE.getKey()) );
210-
if (this.configuration.containsKey(ClientConfigProperties.SERVER_VERSION.getKey())) {
211-
serverVersion = this.configuration.get(ClientConfigProperties.SERVER_VERSION.getKey());
212203
}
204+
} catch (Exception e) {
205+
throw new ClientException("Failed to get server info", e);
213206
}
214207
}
215208

@@ -1213,8 +1206,11 @@ public boolean ping() {
12131206
*/
12141207
public boolean ping(long timeout) {
12151208
long startTime = System.nanoTime();
1216-
try (QueryResponse response = query("SELECT 1 FORMAT TabSeparated").get(timeout, TimeUnit.MILLISECONDS)) {
1217-
return true;
1209+
try {
1210+
CompletableFuture<QueryResponse> future = query("SELECT 1 FORMAT TabSeparated");
1211+
try (QueryResponse response = timeout > 0 ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get()) {
1212+
return true;
1213+
}
12181214
} catch (Exception e) {
12191215
LOG.debug("Failed to connect to the server (Duration: {})", System.nanoTime() - startTime, e);
12201216
return false;
@@ -2153,7 +2149,11 @@ private void applyDefaults(QuerySettings settings) {
21532149

21542150
private <T> CompletableFuture<T> runAsyncOperation(Supplier<T> resultSupplier, Map<String, Object> requestSettings) {
21552151
boolean isAsync = MapUtils.getFlag(requestSettings, configuration, ClientConfigProperties.ASYNC_OPERATIONS.getKey());
2156-
return isAsync ? CompletableFuture.supplyAsync(resultSupplier, sharedOperationExecutor) : CompletableFuture.completedFuture(resultSupplier.get());
2152+
if (isAsync) {
2153+
return sharedOperationExecutor == null ? CompletableFuture.supplyAsync(resultSupplier) :
2154+
CompletableFuture.supplyAsync(resultSupplier, sharedOperationExecutor);
2155+
}
2156+
return CompletableFuture.completedFuture(resultSupplier.get());
21572157
}
21582158

21592159
@Override

client-v2/src/test/java/com/clickhouse/client/ClientTests.java

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
public class ClientTests extends BaseIntegrationTest {
2828
private static final Logger LOGGER = LoggerFactory.getLogger(ClientTests.class);
2929

30-
@Test(dataProvider = "clientProvider")
30+
@Test(groups = {"integration"}, dataProvider = "secureClientProvider")
3131
public void testAddSecureEndpoint(Client client) {
3232
if (isCloud()) {
3333
return; // will fail in other tests
@@ -52,31 +52,35 @@ public void testAddSecureEndpoint(Client client) {
5252
}
5353
}
5454

55-
@DataProvider(name = "clientProvider")
56-
private static Client[] secureClientProvider() throws Exception {
55+
@DataProvider
56+
public static Object[][] secureClientProvider() throws Exception {
5757
ClickHouseNode node = ClickHouseServerForTest.getClickHouseNode(ClickHouseProtocol.HTTP,
5858
true, ClickHouseNode.builder()
5959
.addOption(ClickHouseClientOption.SSL_MODE.getKey(), "none")
6060
.addOption(ClickHouseClientOption.SSL.getKey(), "true").build());
61-
return new Client[]{
62-
new Client.Builder()
63-
.addEndpoint("https://" + node.getHost() + ":" + node.getPort())
64-
.setUsername("default")
65-
.setPassword("")
66-
.setRootCertificate("containers/clickhouse-server/certs/localhost.crt")
67-
.build(),
68-
new Client.Builder()
69-
.addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), true)
70-
.setUsername("default")
71-
.setPassword("")
72-
.setRootCertificate("containers/clickhouse-server/certs/localhost.crt")
73-
.setClientKey("user.key")
74-
.setClientCertificate("user.crt")
75-
.build()
61+
return new Client[][]{
62+
{
63+
new Client.Builder()
64+
.addEndpoint("https://" + node.getHost() + ":" + node.getPort())
65+
.setUsername("default")
66+
.setPassword("")
67+
.setRootCertificate("containers/clickhouse-server/certs/localhost.crt")
68+
.build()
69+
},
70+
{
71+
new Client.Builder()
72+
.addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), true)
73+
.setUsername("default")
74+
.setPassword("")
75+
.setRootCertificate("containers/clickhouse-server/certs/localhost.crt")
76+
.setClientKey("user.key")
77+
.setClientCertificate("user.crt")
78+
.build()
79+
}
7680
};
7781
}
7882

79-
@Test
83+
@Test(groups = {"integration"})
8084
public void testRawSettings() {
8185
Client client = newClient()
8286
.setOption("custom_setting_1", "value_1")
@@ -102,33 +106,39 @@ public void testRawSettings() {
102106
}
103107
}
104108

105-
@Test
109+
@Test(groups = {"integration"})
106110
public void testPing() {
107111
try (Client client = newClient().build()) {
108112
Assert.assertTrue(client.ping());
109113
}
110114
}
111115

112-
@Test
116+
@Test(groups = {"integration"})
113117
public void testPingUnpooled() {
114118
try (Client client = newClient().enableConnectionPool(false).build()) {
115119
Assert.assertTrue(client.ping());
116120
}
117121
}
118122

119-
@Test
123+
@Test(groups = {"integration"})
120124
public void testPingFailure() {
121125
try (Client client = new Client.Builder()
122126
.addEndpoint("http://localhost:12345")
123127
.setUsername("default")
124128
.setPassword("")
125-
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true"))
126129
.build()) {
127130
Assert.assertFalse(client.ping(TimeUnit.SECONDS.toMillis(20)));
128131
}
129132
}
130133

131-
@Test
134+
@Test(groups = {"integration"})
135+
public void testPingAsync() {
136+
try (Client client = newClient().useAsyncRequests(true).build()) {
137+
Assert.assertTrue(client.ping());
138+
}
139+
}
140+
141+
@Test(groups = {"integration"})
132142
public void testSetOptions() {
133143
Map<String, String> options = new HashMap<>();
134144
String productName = "my product_name (version 1.0)";
@@ -140,7 +150,7 @@ public void testSetOptions() {
140150
}
141151
}
142152

143-
@Test
153+
@Test(groups = {"integration"})
144154
public void testProvidedExecutor() throws Exception {
145155

146156
ExecutorService executorService = Executors.newSingleThreadExecutor();
@@ -159,19 +169,19 @@ public void testProvidedExecutor() throws Exception {
159169
Assert.assertFalse(flag.get());
160170
}
161171

162-
@Test
172+
@Test(groups = {"integration"})
163173
public void testLoadingServerContext() throws Exception {
164174
long start = System.nanoTime();
165175
try (Client client = newClient().build()) {
166176
long initTime = (System.nanoTime() - start) / 1_000_000;
167177
Assert.assertTrue(initTime < 100);
168-
Assert.assertNull(client.getServerVersion());
178+
Assert.assertEquals(client.getServerVersion(), "unknown");
169179
client.loadServerInfo();
170180
Assert.assertNotNull(client.getServerVersion());
171181
}
172182
}
173183

174-
@Test
184+
@Test(groups = {"integration"})
175185
public void testDisableNative() {
176186
try (Client client = newClient().disableNativeCompression(true).build()) {
177187
Assert.assertTrue(client.toString().indexOf("JavaUnsafe") != -1);
@@ -185,7 +195,6 @@ protected Client.Builder newClient() {
185195
.addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), isSecure)
186196
.setUsername("default")
187197
.setPassword(ClickHouseServerForTest.getPassword())
188-
.setDefaultDatabase(ClickHouseServerForTest.getDatabase())
189-
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "true").equals("true"));
198+
.setDefaultDatabase(ClickHouseServerForTest.getDatabase());
190199
}
191200
}

client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ public void testConnectionRequestTimeout() {
203203
}
204204
}
205205

206-
@Test
206+
@Test(groups = {"integration"})
207207
public void testConnectionReuseStrategy() {
208208
if (isCloud()) {
209209
return; // mocked server

client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.clickhouse.client.ClickHouseProtocol;
66
import com.clickhouse.client.ClickHouseServerForTest;
77
import com.clickhouse.client.api.Client;
8+
import com.clickhouse.client.api.ClientConfigProperties;
89
import com.clickhouse.client.api.ClientException;
910
import com.clickhouse.client.api.DataTypeUtils;
1011
import com.clickhouse.client.api.command.CommandResponse;
@@ -26,6 +27,7 @@
2627
import com.clickhouse.data.ClickHouseFormat;
2728
import com.clickhouse.data.ClickHouseVersion;
2829
import com.clickhouse.data.format.BinaryStreamUtils;
30+
import lombok.Data;
2931
import net.jpountz.lz4.LZ4Compressor;
3032
import net.jpountz.lz4.LZ4Factory;
3133
import net.jpountz.lz4.LZ4SafeDecompressor;
@@ -269,6 +271,43 @@ public void insertRawData() throws Exception {
269271
assertEquals(records.size(), 1000);
270272
}
271273

274+
@Test(groups = { "integration" }, dataProvider = "insertRawDataAsyncProvider", dataProviderClass = InsertTests.class)
275+
public void insertRawDataAsync(boolean async) throws Exception {
276+
final String tableName = "raw_data_table_async";
277+
final String createSQL = "CREATE TABLE " + tableName +
278+
" (Id UInt32, event_ts Timestamp, name String, p1 Int64, p2 String) ENGINE = MergeTree() ORDER BY ()";
279+
280+
initTable(tableName, createSQL);
281+
282+
InsertSettings localSettings = new InsertSettings(settings.getAllSettings());
283+
localSettings.setOption(ClientConfigProperties.ASYNC_OPERATIONS.getKey(), async);
284+
ByteArrayOutputStream data = new ByteArrayOutputStream();
285+
PrintWriter writer = new PrintWriter(data);
286+
for (int i = 0; i < 1000; i++) {
287+
writer.printf("%d\t%s\t%s\t%d\t%s\n", i, "2021-01-01 00:00:00", "name" + i, i, "p2");
288+
}
289+
writer.flush();
290+
client.insert(tableName, new ByteArrayInputStream(data.toByteArray()),
291+
ClickHouseFormat.TSV, localSettings).whenComplete((response, throwable) -> {
292+
OperationMetrics metrics = response.getMetrics();
293+
assertEquals((int)response.getWrittenRows(), 1000 );
294+
295+
List<GenericRecord> records = client.queryAll("SELECT * FROM " + tableName);
296+
assertEquals(records.size(), 1000);
297+
assertTrue(Thread.currentThread().getName()
298+
.startsWith(async ? "ForkJoinPool.commonPool" : "main"), "Threads starts with " + Thread.currentThread().getName());
299+
})
300+
.join(); // wait operation complete. only for tests
301+
}
302+
303+
@DataProvider
304+
public static Object[][] insertRawDataAsyncProvider(){
305+
return new Object[][] {
306+
{true}, // async
307+
{false} // blocking
308+
};
309+
}
310+
272311
@Test(groups = { "integration" }, dataProvider = "insertRawDataSimpleDataProvider", dataProviderClass = InsertTests.class)
273312
public void insertRawDataSimple(String tableName) throws Exception {
274313
// final String tableName = "raw_data_table";
@@ -639,10 +678,9 @@ public void testCollectionInsert() throws Exception {
639678
}
640679
}
641680

642-
643-
static {
644-
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG");
645-
}
681+
// static {
682+
// System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG");
683+
// }
646684

647685
@Test(groups = {"integration"}, dataProvider = "testAppCompressionDataProvider", dataProviderClass = InsertTests.class)
648686
public void testAppCompression(String algo) throws Exception {

jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,11 @@ public ConnectionImpl(String url, Properties info) throws SQLException {
9595
this.client = this.config.applyClientProperties(new Client.Builder())
9696
.setClientName(clientName)
9797
.build();
98-
this.client.loadServerInfo();
98+
String serverTimezone = this.client.getServerTimeZone();
99+
if (serverTimezone == null) {
100+
// we cannot operate without timezone
101+
this.client.loadServerInfo();
102+
}
99103
this.schema = client.getDefaultDatabase();
100104
this.defaultQuerySettings = new QuerySettings()
101105
.serverSetting(ServerSettings.ASYNC_INSERT, "0")

0 commit comments

Comments
 (0)