Skip to content

Commit f499762

Browse files
authored
Merge pull request #1732 from ClickHouse/fix_inputstream_client_v2
[client-v2] Migrate to use InputStream do read data
2 parents a6a8a22 + 881cda2 commit f499762

File tree

12 files changed

+367
-110
lines changed

12 files changed

+367
-110
lines changed

clickhouse-http-client/src/main/java/com/clickhouse/client/http/ApacheHttpConnectionImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ private ClickHouseHttpResponse buildResponse(ClickHouseConfig config, CloseableH
115115
// {"read_rows":"0","read_bytes":"0","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
116116
String displayName = getResponseHeader(response, "X-ClickHouse-Server-Display-Name", server.getHost());
117117
String queryId = getResponseHeader(response, "X-ClickHouse-Query-Id", "");
118-
String summary = getResponseHeader(response, "X-ClickHouse-Summary", "{}");
118+
Header hSum = response.getLastHeader("X-ClickHouse-Summary");
119+
String summary = hSum == null ? "{}" : hSum.getValue(); // getResponseHeader(response, "X-ClickHouse-Summary", "{}");
119120

120121
ClickHouseFormat format = config.getFormat();
121122
TimeZone timeZone = config.getServerTimeZone();

clickhouse-http-client/src/main/java/com/clickhouse/client/http/config/ClickHouseHttpOption.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public enum ClickHouseHttpOption implements ClickHouseOption {
5858
// "Enables or disables X-ClickHouse-Progress HTTP response headers in
5959
// clickhouse-server responses."),
6060
// SEND_PROGRESS_INTERVAL("http_headers_progress_interval_ms", 3000, ""),
61-
// WAIT_END_OF_QUERY("wait_end_of_query", false, ""),
61+
WAIT_END_OF_QUERY("wait_end_of_query", false, ""),
6262

6363
/**
6464
* Whether to remember last set role and send them in every next requests as query parameters.

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.clickhouse.client.api.query.Records;
3131
import com.clickhouse.client.config.ClickHouseClientOption;
3232
import com.clickhouse.client.config.ClickHouseDefaults;
33+
import com.clickhouse.client.http.config.ClickHouseHttpOption;
3334
import com.clickhouse.data.ClickHouseColumn;
3435
import com.clickhouse.data.ClickHouseDataStreamFactory;
3536
import com.clickhouse.data.ClickHouseFormat;
@@ -820,6 +821,7 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
820821
settings = new QuerySettings();
821822
}
822823
settings.setFormat(ClickHouseFormat.RowBinaryWithNamesAndTypes);
824+
settings.waitEndOfQuery(true); // we rely on the summery
823825
ClientStatisticsHolder clientStats = new ClientStatisticsHolder();
824826
clientStats.start("query");
825827
ClickHouseClient client = ClientV1AdaptorHelper.createClient(configuration);
@@ -865,9 +867,9 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
865867
public List<GenericRecord> queryAll(String sqlQuery) {
866868
try {
867869
int operationTimeout = getOperationTimeout();
868-
869-
try (QueryResponse response = operationTimeout == 0 ? query(sqlQuery).get() :
870-
query(sqlQuery).get(operationTimeout, TimeUnit.MILLISECONDS)) {
870+
QuerySettings settings = new QuerySettings().waitEndOfQuery(true);
871+
try (QueryResponse response = operationTimeout == 0 ? query(sqlQuery, settings).get() :
872+
query(sqlQuery, settings).get(operationTimeout, TimeUnit.MILLISECONDS)) {
871873
List<GenericRecord> records = new ArrayList<>();
872874
if (response.getResultRows() > 0) {
873875
ClickHouseBinaryFormatReader reader = new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream());

client-v2/src/main/java/com/clickhouse/client/api/data_formats/NativeFormatReader.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
package com.clickhouse.client.api.data_formats;
22

3-
import com.clickhouse.client.api.ClientException;
43
import com.clickhouse.client.api.data_formats.internal.AbstractBinaryFormatReader;
4+
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
55
import com.clickhouse.client.api.query.QuerySettings;
66
import com.clickhouse.data.ClickHouseColumn;
7-
import com.clickhouse.data.ClickHouseDataType;
87

9-
import java.io.EOFException;
108
import java.io.IOException;
119
import java.io.InputStream;
1210
import java.util.ArrayList;
@@ -43,14 +41,15 @@ protected void readRecord(Map<String, Object> record) throws IOException {
4341
}
4442

4543
private void readBlock() throws IOException {
46-
int nColumns = chInputStream.readVarInt();
47-
int nRows = chInputStream.readVarInt();
44+
int nColumns = BinaryStreamReader.readVarInt(input);
45+
int nRows = BinaryStreamReader.readVarInt(input);
4846

4947
List<String> names = new ArrayList<>(nColumns);
5048
List<String> types = new ArrayList<>(nColumns);
5149
currentBlock = new Block(names, types, nRows);
5250
for (int i = 0; i < nColumns; i++) {
53-
ClickHouseColumn column = ClickHouseColumn.of(chInputStream.readUnicodeString(), chInputStream.readUnicodeString());
51+
ClickHouseColumn column = ClickHouseColumn.of(BinaryStreamReader.readString(input),
52+
BinaryStreamReader.readString(input));
5453
names.add(column.getColumnName());
5554
types.add(column.getDataType().name());
5655

client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryFormatReader.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
package com.clickhouse.client.api.data_formats;
22

3-
import com.clickhouse.client.api.ClientException;
43
import com.clickhouse.client.api.data_formats.internal.AbstractBinaryFormatReader;
54
import com.clickhouse.client.api.metadata.TableSchema;
65
import com.clickhouse.client.api.query.QuerySettings;
76
import com.clickhouse.data.ClickHouseColumn;
87

9-
import java.io.EOFException;
108
import java.io.IOException;
119
import java.io.InputStream;
1210
import java.util.Map;

client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryWithNamesAndTypesFormatReader.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,17 @@
22

33
import com.clickhouse.client.api.ClientException;
44
import com.clickhouse.client.api.data_formats.internal.AbstractBinaryFormatReader;
5+
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
56
import com.clickhouse.client.api.metadata.TableSchema;
67
import com.clickhouse.client.api.query.QuerySettings;
78
import com.clickhouse.data.ClickHouseColumn;
89

9-
import java.io.EOFException;
1010
import java.io.IOException;
1111
import java.io.InputStream;
1212
import java.util.ArrayList;
1313
import java.util.Iterator;
1414
import java.util.List;
1515
import java.util.Map;
16-
import java.util.NoSuchElementException;
1716

1817
public class RowBinaryWithNamesAndTypesFormatReader extends AbstractBinaryFormatReader implements Iterator<Map<String, Object>> {
1918

@@ -28,18 +27,18 @@ public RowBinaryWithNamesAndTypesFormatReader(InputStream inputStream, QuerySett
2827

2928
private void readSchema() {
3029
try {
31-
if (inputStream.available() < 1) {
30+
if (input.available() < 1) {
3231
return;
3332
}
3433
TableSchema headerSchema = new TableSchema();
3534
List<String> columns = new ArrayList<>();
36-
int nCol = chInputStream.readVarInt();
35+
int nCol = BinaryStreamReader.readVarInt(input);
3736
for (int i = 0; i < nCol; i++) {
38-
columns.add(chInputStream.readUnicodeString());
37+
columns.add(BinaryStreamReader.readString(input));
3938
}
4039

4140
for (int i = 0; i < nCol; i++) {
42-
headerSchema.addColumn(columns.get(i), chInputStream.readUnicodeString());
41+
headerSchema.addColumn(columns.get(i), BinaryStreamReader.readString(input));
4342
}
4443

4544
setSchema(headerSchema);

client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryWithNamesFormatReader.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
package com.clickhouse.client.api.data_formats;
22

3-
import com.clickhouse.client.api.ClientException;
43
import com.clickhouse.client.api.data_formats.internal.AbstractBinaryFormatReader;
4+
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
55
import com.clickhouse.client.api.metadata.TableSchema;
66
import com.clickhouse.client.api.query.QuerySettings;
77
import com.clickhouse.data.ClickHouseColumn;
88

9-
import java.io.EOFException;
109
import java.io.IOException;
1110
import java.io.InputStream;
1211
import java.util.ArrayList;
@@ -30,9 +29,9 @@ public RowBinaryWithNamesFormatReader(InputStream inputStream, QuerySettings que
3029
public void readRecord(Map<String, Object> record) throws IOException {
3130
if (columns == null) {
3231
columns = new ArrayList<>();
33-
int nCol = chInputStream.readVarInt();
32+
int nCol = BinaryStreamReader.readVarInt(input);
3433
for (int i = 0; i < nCol; i++) {
35-
columns.add(chInputStream.readUnicodeString());
34+
columns.add(BinaryStreamReader.readString(input));
3635
}
3736

3837
columns = Collections.unmodifiableList(columns);

client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import com.clickhouse.client.api.query.NullValueException;
77
import com.clickhouse.client.api.query.QuerySettings;
88
import com.clickhouse.data.ClickHouseColumn;
9-
import com.clickhouse.data.ClickHouseInputStream;
109
import com.clickhouse.data.value.ClickHouseArrayValue;
1110
import com.clickhouse.data.value.ClickHouseGeoMultiPolygonValue;
1211
import com.clickhouse.data.value.ClickHouseGeoPointValue;
@@ -41,9 +40,7 @@ public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryForm
4140

4241
private static final Logger LOG = LoggerFactory.getLogger(AbstractBinaryFormatReader.class);
4342

44-
protected InputStream inputStream;
45-
46-
protected ClickHouseInputStream chInputStream;
43+
protected InputStream input;
4744

4845
protected Map<String, Object> settings;
4946

@@ -54,15 +51,12 @@ public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryForm
5451
protected volatile boolean hasNext = true;
5552

5653
protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema) {
57-
this.inputStream = inputStream;
58-
this.chInputStream = inputStream instanceof ClickHouseInputStream ?
59-
(ClickHouseInputStream) inputStream : ClickHouseInputStream.of(inputStream);
54+
this.input = inputStream;
6055
this.settings = querySettings == null ? Collections.emptyMap() : new HashMap<>(querySettings.getAllSettings());
61-
this.binaryStreamReader = new BinaryStreamReader(chInputStream, LOG);
56+
this.binaryStreamReader = new BinaryStreamReader(inputStream, LOG);
6257
setSchema(schema);
6358
}
6459

65-
6660
protected Map<String, Object> currentRecord = new ConcurrentHashMap<>();
6761

6862
protected abstract void readRecord(Map<String, Object> record) throws IOException;
@@ -85,7 +79,7 @@ public <T> T readValue(String colName) {
8579
public boolean hasNext() {
8680
if (hasNext) {
8781
try {
88-
hasNext = chInputStream.available() > 0;
82+
hasNext = input.available() > 0;
8983
return hasNext;
9084
} catch (IOException e) {
9185
hasNext = false;

0 commit comments

Comments
 (0)