Skip to content

Commit c9a1b2d

Browse files
committed
Merge branch 'main' into clientv2_primitive_reader
2 parents 8e1e2ef + 5ceb43d commit c9a1b2d

File tree

21 files changed

+516
-192
lines changed

21 files changed

+516
-192
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 118 additions & 78 deletions
Large diffs are not rendered by default.

client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -329,12 +329,9 @@ public ZonedDateTime getZonedDateTime(String colName) {
329329
switch (column.getDataType()) {
330330
case DateTime:
331331
case DateTime64:
332-
LocalDateTime dateTime = readValue(colName);
333-
return dateTime.atZone(column.getTimeZone().toZoneId());
334332
case Date:
335333
case Date32:
336-
LocalDate data = readValue(colName);
337-
return data.atStartOfDay(column.getTimeZone().toZoneId());
334+
return readValue(colName);
338335
}
339336

340337
throw new ClientException("Column of type " + column.getDataType() + " cannot be converted to Instant");

client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/BinaryStreamReader.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import com.clickhouse.client.api.ClientException;
44
import com.clickhouse.data.ClickHouseColumn;
55
import com.clickhouse.data.ClickHouseDataType;
6+
import com.clickhouse.data.format.BinaryStreamUtils;
7+
import com.clickhouse.data.value.ClickHouseBitmap;
68
import org.slf4j.Logger;
79
import org.slf4j.helpers.NOPLogger;
810

@@ -213,11 +215,13 @@ public <T> T readValue(ClickHouseColumn column, Class<?> typeHint) throws IOExce
213215
case Nothing:
214216
return null;
215217
// case SimpleAggregateFunction:
216-
// case AggregateFunction:
218+
case AggregateFunction:
219+
return (T) ClickHouseBitmap.deserialize(input, column.getNestedColumns().get(0).getDataType());
217220
default:
218221
throw new IllegalArgumentException("Unsupported data type: " + column.getDataType());
219222
}
220223
} catch (EOFException e) {
224+
log.info("End of stream reached before reading all data for column " + column.getColumnName());
221225
throw e;
222226
} catch (Exception e) {
223227
throw new ClientException("Failed to read value for column " + column.getColumnName(), e);

client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/MapBackedRecord.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -159,12 +159,9 @@ public ZonedDateTime getZonedDateTime(String colName) {
159159
switch (column.getDataType()) {
160160
case DateTime:
161161
case DateTime64:
162-
LocalDateTime dateTime = readValue(colName);
163-
return dateTime.atZone(column.getTimeZone().toZoneId());
164162
case Date:
165163
case Date32:
166-
LocalDate data = readValue(colName);
167-
return data.atStartOfDay(column.getTimeZone().toZoneId());
164+
return readValue(colName);
168165
}
169166

170167
throw new ClientException("Column of type " + column.getDataType() + " cannot be converted to Instant");
@@ -449,36 +446,40 @@ public short getEnum16(int index) {
449446
}
450447

451448
@Override
452-
public LocalDate getLocalDate(String colName) {
453-
Object value = readValue(colName);
454-
if (value instanceof LocalDateTime) {
455-
return ((LocalDateTime) value).toLocalDate();
449+
public LocalDate getLocalDate(int index) {
450+
Object value = readValue(index);
451+
if (value instanceof ZonedDateTime) {
452+
return ((ZonedDateTime) value).toLocalDate();
456453
}
457454
return (LocalDate) value;
458-
459455
}
460456

461457
@Override
462-
public LocalDate getLocalDate(int index) {
463-
Object value = readValue(index);
464-
if (value instanceof LocalDateTime) {
465-
return ((LocalDateTime) value).toLocalDate();
458+
public LocalDate getLocalDate(String colName) {
459+
Object value = readValue(colName);
460+
if (value instanceof ZonedDateTime) {
461+
return ((ZonedDateTime) value).toLocalDate();
466462
}
467463
return (LocalDate) value;
464+
468465
}
469466

470467
@Override
471468
public LocalDateTime getLocalDateTime(String colName) {
472469
Object value = readValue(colName);
473-
if (value instanceof LocalDate) {
474-
return ((LocalDate) value).atStartOfDay();
470+
if (value instanceof ZonedDateTime) {
471+
return ((ZonedDateTime) value).toLocalDateTime();
475472
}
476473
return (LocalDateTime) value;
477474
}
478475

479476
@Override
480477
public LocalDateTime getLocalDateTime(int index) {
481-
return readValue(index);
478+
Object value = readValue(index);
479+
if (value instanceof ZonedDateTime) {
480+
return ((ZonedDateTime) value).toLocalDateTime();
481+
}
482+
return (LocalDateTime) value;
482483
}
483484

484485
@Override

client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
package com.clickhouse.client.api.insert;
22

33
import com.clickhouse.client.api.internal.ValidationUtils;
4+
import com.clickhouse.client.config.ClickHouseClientOption;
45

5-
import java.util.Collections;
66
import java.util.HashMap;
77
import java.util.Map;
88

99
public class InsertSettings {
1010
private static final int DEFAULT_INPUT_STREAM_BATCH_SIZE = 8196;
1111

12-
private String queryId;
1312
private int inputStreamCopyBufferSize;
1413
private String operationId;
1514
Map<String, Object> rawSettings;
@@ -27,7 +26,6 @@ public InsertSettings(Map<String, Object> settings) {
2726

2827
private void setDefaults() {// Default settings, for now a very small list
2928
this.setInputStreamCopyBufferSize(DEFAULT_INPUT_STREAM_BATCH_SIZE);
30-
this.queryId = null;
3129
}
3230

3331
/**
@@ -57,7 +55,7 @@ public void setOption(String option, Object value) {
5755
* @return all settings
5856
*/
5957
public Map<String, Object> getAllSettings() {
60-
return Collections.unmodifiableMap(rawSettings);
58+
return rawSettings;
6159
}
6260

6361
/**
@@ -72,14 +70,14 @@ public InsertSettings setDeduplicationToken(String token) {
7270
}
7371

7472
public String getQueryId() {
75-
return this.queryId;
73+
return (String) rawSettings.get(ClickHouseClientOption.QUERY_ID.getKey());
7674
}
7775

7876
/**
7977
* Sets the query id. This id will be sent to the server and can be used to identify the query.
8078
*/
8179
public InsertSettings setQueryId(String queryId) {
82-
this.queryId = queryId;
80+
rawSettings.put(ClickHouseClientOption.QUERY_ID.getKey(), queryId);
8381
return this;
8482
}
8583

client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,20 @@
5353

5454
import javax.net.ssl.SSLContext;
5555
import javax.net.ssl.SSLException;
56+
import java.io.BufferedReader;
5657
import java.io.ByteArrayOutputStream;
5758
import java.io.IOException;
59+
import java.io.InputStream;
60+
import java.io.InputStreamReader;
5861
import java.io.OutputStream;
5962
import java.net.ConnectException;
6063
import java.net.InetSocketAddress;
6164
import java.net.NoRouteToHostException;
6265
import java.net.URI;
6366
import java.net.URISyntaxException;
6467
import java.net.UnknownHostException;
68+
import java.nio.ByteBuffer;
69+
import java.nio.charset.StandardCharsets;
6570
import java.security.NoSuchAlgorithmException;
6671
import java.util.Base64;
6772
import java.util.EnumSet;
@@ -272,18 +277,47 @@ public CloseableHttpClient createHttpClient() {
272277
return clientBuilder.build();
273278
}
274279

280+
private static final String ERROR_CODE_PREFIX_PATTERN = "Code: %d. DB::Exception:";
275281
/**
276282
* Reads status line and if error tries to parse response body to get server error message.
277283
*
278284
* @param httpResponse - HTTP response
279285
* @return
280286
*/
281287
public Exception readError(ClassicHttpResponse httpResponse) {
282-
try (ByteArrayOutputStream out = new ByteArrayOutputStream(ERROR_BODY_BUFFER_SIZE)) {
283-
httpResponse.getEntity().writeTo(out);
284-
String message = out.toString();
285-
int serverCode = getHeaderInt(httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE), 0);
286-
return new ServerException(serverCode, message);
288+
int serverCode = getHeaderInt(httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE), 0);
289+
try (InputStream body = httpResponse.getEntity().getContent()) {
290+
291+
byte [] buffer = new byte[ERROR_BODY_BUFFER_SIZE];
292+
byte [] lookUpStr = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode).getBytes(StandardCharsets.UTF_8);
293+
while (true) {
294+
int rBytes = body.read(buffer);
295+
if (rBytes == -1) {
296+
break;
297+
} else {
298+
for (int i = 0; i < rBytes; i++) {
299+
if (buffer[i] == lookUpStr[0]) {
300+
boolean found = true;
301+
for (int j = 1; j < Math.min(rBytes - i, lookUpStr.length); j++) {
302+
if (buffer[i + j] != lookUpStr[j]) {
303+
found = false;
304+
break;
305+
}
306+
}
307+
if (found) {
308+
int start = i;
309+
while (i < rBytes && buffer[i] != '\n') {
310+
i++;
311+
}
312+
313+
return new ServerException(serverCode, new String(buffer, start, i -start, StandardCharsets.UTF_8));
314+
}
315+
}
316+
}
317+
}
318+
}
319+
320+
return new ServerException(serverCode, String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " <Unreadable error message>");
287321
} catch (IOException e) {
288322
throw new ClientException("Failed to read response body", e);
289323
}
@@ -307,12 +341,13 @@ public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Obj
307341
.build();
308342
req.setConfig(httpReqConfig);
309343
// setting entity. wrapping if compression is enabled
310-
req.setEntity(wrapEntity(new EntityTemplate(-1, CONTENT_TYPE, null, writeCallback)));
344+
req.setEntity(wrapEntity(new EntityTemplate(-1, CONTENT_TYPE, null, writeCallback), false));
311345

312346
HttpClientContext context = HttpClientContext.create();
313347

314348
try {
315349
ClassicHttpResponse httpResponse = httpClient.executeOpen(null, req, context);
350+
httpResponse.setEntity(wrapEntity(httpResponse.getEntity(), true));
316351
if (httpResponse.getCode() == HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED) {
317352
throw new ClientMisconfigurationException("Proxy authentication required. Please check your proxy settings.");
318353
} else if (httpResponse.getCode() >= HttpStatus.SC_BAD_REQUEST &&
@@ -329,8 +364,6 @@ public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Obj
329364
httpResponse.close();
330365
return httpResponse;
331366
}
332-
333-
httpResponse.setEntity(wrapEntity(httpResponse.getEntity()));
334367
return httpResponse;
335368

336369
} catch (UnknownHostException e) {
@@ -354,6 +387,9 @@ private void addHeaders(HttpPost req, Map<String, String> chConfig, Map<String,
354387
if (requestConfig.containsKey(ClickHouseClientOption.FORMAT.getKey())) {
355388
req.addHeader(ClickHouseHttpProto.HEADER_FORMAT, requestConfig.get(ClickHouseClientOption.FORMAT.getKey()));
356389
}
390+
if (requestConfig.containsKey(ClickHouseClientOption.QUERY_ID.getKey())) {
391+
req.addHeader(ClickHouseHttpProto.HEADER_QUERY_ID, requestConfig.get(ClickHouseClientOption.QUERY_ID.getKey()).toString());
392+
}
357393
}
358394
req.addHeader(ClickHouseHttpProto.HEADER_DATABASE, chConfig.get(ClickHouseClientOption.DATABASE.getKey()));
359395
req.addHeader(ClickHouseHttpProto.HEADER_DB_USER, chConfig.get(ClickHouseDefaults.USER.getKey()));
@@ -413,13 +449,13 @@ private void addQueryParams(URIBuilder req, Map<String, String> chConfig, Map<St
413449
}
414450
}
415451

416-
private HttpEntity wrapEntity(HttpEntity httpEntity) {
452+
private HttpEntity wrapEntity(HttpEntity httpEntity, boolean isResponse) {
417453
boolean serverCompression = chConfiguration.getOrDefault(ClickHouseClientOption.COMPRESS.getKey(), "false").equalsIgnoreCase("true");
418454
boolean clientCompression = chConfiguration.getOrDefault(ClickHouseClientOption.DECOMPRESS.getKey(), "false").equalsIgnoreCase("true");
419455
boolean useHttpCompression = chConfiguration.getOrDefault("client.use_http_compression", "false").equalsIgnoreCase("true");
420456
if (serverCompression || clientCompression) {
421457
return new LZ4Entity(httpEntity, useHttpCompression, serverCompression, clientCompression,
422-
MapUtils.getInt(chConfiguration, "compression.lz4.uncompressed_buffer_size"));
458+
MapUtils.getInt(chConfiguration, "compression.lz4.uncompressed_buffer_size"), isResponse);
423459
} else {
424460
return httpEntity;
425461
}

client-v2/src/main/java/com/clickhouse/client/api/internal/LZ4Entity.java

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,55 +16,70 @@
1616
class LZ4Entity implements HttpEntity {
1717

1818
private HttpEntity httpEntity;
19-
19+
2020
private final boolean useHttpCompression;
21-
private final boolean serverCompression;
22-
private final boolean clientCompression;
23-
21+
2422
private final int bufferSize;
2523

24+
private final boolean isResponse;
25+
26+
private boolean serverCompression;
27+
28+
private boolean clientCompression;
29+
2630
LZ4Entity(HttpEntity httpEntity, boolean useHttpCompression, boolean serverCompression, boolean clientCompression,
27-
int bufferSize) {
31+
int bufferSize, boolean isResponse) {
2832
this.httpEntity = httpEntity;
2933
this.useHttpCompression = useHttpCompression;
34+
this.bufferSize = bufferSize;
3035
this.serverCompression = serverCompression;
3136
this.clientCompression = clientCompression;
32-
this.bufferSize = bufferSize;
37+
this.isResponse = isResponse;
3338
}
34-
39+
3540
@Override
3641
public boolean isRepeatable() {
3742
return httpEntity.isRepeatable();
3843
}
3944

4045
@Override
4146
public InputStream getContent() throws IOException, UnsupportedOperationException {
42-
if (serverCompression && useHttpCompression) {
43-
InputStream content = httpEntity.getContent();
44-
try {
45-
return new FramedLZ4CompressorInputStream(content);
46-
} catch (IOException e) {
47-
// This is the easiest way to handle empty content because
48-
// - streams at this point wrapped with something else and we can't check content length
49-
// - exception is thrown with no details
50-
// So we just return original content and if there is a real data in it we will get error later
51-
return content;
52-
}
47+
if (!isResponse && serverCompression) {
48+
throw new UnsupportedOperationException("Unsupported: getting compressed content of request");
5349
} else if (serverCompression) {
54-
return new ClickHouseLZ4InputStream(httpEntity.getContent(), LZ4Factory.fastestInstance().fastDecompressor(),
55-
bufferSize);
50+
if (useHttpCompression) {
51+
InputStream content = httpEntity.getContent();
52+
try {
53+
return new FramedLZ4CompressorInputStream(content);
54+
} catch (IOException e) {
55+
// This is the easiest way to handle empty content because
56+
// - streams at this point wrapped with something else and we can't check content length
57+
// - exception is thrown with no details
58+
// So we just return original content and if there is a real data in it we will get error later
59+
return content;
60+
}
61+
} else {
62+
return new ClickHouseLZ4InputStream(httpEntity.getContent(), LZ4Factory.fastestInstance().fastDecompressor(),
63+
bufferSize);
64+
}
5665
} else {
5766
return httpEntity.getContent();
5867
}
5968
}
6069

6170
@Override
6271
public void writeTo(OutputStream outStream) throws IOException {
63-
if (clientCompression && useHttpCompression) {
64-
httpEntity.writeTo(new FramedLZ4CompressorOutputStream(outStream));
72+
if (isResponse && serverCompression) {
73+
// called by us to get compressed response
74+
throw new UnsupportedOperationException("Unsupported: writing compressed response to elsewhere");
6575
} else if (clientCompression) {
66-
httpEntity.writeTo(new ClickHouseLZ4OutputStream(outStream, LZ4Factory.fastestInstance().fastCompressor(),
67-
bufferSize));
76+
// called by client to send data
77+
if (useHttpCompression) {
78+
httpEntity.writeTo(new FramedLZ4CompressorOutputStream(outStream));
79+
} else {
80+
httpEntity.writeTo(new ClickHouseLZ4OutputStream(outStream, LZ4Factory.fastestInstance().fastCompressor(),
81+
bufferSize));
82+
}
6883
} else {
6984
httpEntity.writeTo(outStream);
7085
}

0 commit comments

Comments
 (0)