Skip to content

Commit 79a6589

Browse files
committed
fix reading binary from all streams
1 parent 098772d commit 79a6589

File tree

12 files changed

+166
-91
lines changed

12 files changed

+166
-91
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ public Builder setSocketLinger(int secondsToWait) {
392392
* @param enabled - indicates if server response compression is enabled
393393
*/
394394
public Builder compressServerResponse(boolean enabled) {
395-
this.configuration.put("compress", String.valueOf(enabled));
395+
this.configuration.put(ClickHouseClientOption.COMPRESS.getKey(), String.valueOf(enabled));
396396
return this;
397397
}
398398

@@ -403,7 +403,7 @@ public Builder compressServerResponse(boolean enabled) {
403403
* @param enabled - indicates if client request compression is enabled
404404
*/
405405
public Builder compressClientRequest(boolean enabled) {
406-
this.configuration.put("decompress", String.valueOf(enabled));
406+
this.configuration.put(ClickHouseClientOption.DECOMPRESS.getKey(), String.valueOf(enabled));
407407
return this;
408408
}
409409

@@ -1149,8 +1149,9 @@ public List<GenericRecord> queryAll(String sqlQuery) {
11491149
List<GenericRecord> records = new ArrayList<>();
11501150
if (response.getResultRows() > 0) {
11511151
ClickHouseBinaryFormatReader reader = new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream());
1152-
while (reader.hasNext()) {
1153-
records.add(new MapBackedRecord(reader.next(), reader.getSchema()));
1152+
Map<String, Object> record;
1153+
while ((record = reader.next()) != null) {
1154+
records.add(new MapBackedRecord(record, reader.getSchema()));
11541155
}
11551156
}
11561157
return records;

client-v2/src/main/java/com/clickhouse/client/api/data_formats/ClickHouseBinaryFormatReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public interface ClickHouseBinaryFormatReader {
5252
/**
5353
* Moves cursor to the next row. Must be called before reading the first row.
5454
*
55-
* @return true if there are more rows to read, false otherwise
55+
* @return map filled with column values or null if no more records are available
5656
*/
5757
Map<String, Object> next();
5858

client-v2/src/main/java/com/clickhouse/client/api/data_formats/NativeFormatReader.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.clickhouse.client.api.query.QuerySettings;
66
import com.clickhouse.data.ClickHouseColumn;
77

8+
import java.io.EOFException;
89
import java.io.IOException;
910
import java.io.InputStream;
1011
import java.util.ArrayList;
@@ -28,20 +29,30 @@ public NativeFormatReader(InputStream inputStream) {
2829

2930
public NativeFormatReader(InputStream inputStream, QuerySettings settings) {
3031
super(inputStream, settings, null);
32+
readNextRecord();
3133
}
3234

3335
@Override
34-
protected void readRecord(Map<String, Object> record) throws IOException {
36+
protected boolean readRecord(Map<String, Object> record) throws IOException {
3537
if (currentBlock == null || blockRowIndex >= currentBlock.getnRows()) {
36-
readBlock();
38+
if (!readBlock()) {
39+
return false;
40+
}
3741
}
3842

3943
currentBlock.fillRecord(blockRowIndex, record);
4044
blockRowIndex++;
45+
return true;
4146
}
4247

43-
private void readBlock() throws IOException {
44-
int nColumns = BinaryStreamReader.readVarInt(input);
48+
private boolean readBlock() throws IOException {
49+
int nColumns;
50+
try {
51+
nColumns = BinaryStreamReader.readVarInt(input);
52+
} catch (EOFException e) {
53+
endReached();
54+
return false;
55+
}
4556
int nRows = BinaryStreamReader.readVarInt(input);
4657

4758
List<String> names = new ArrayList<>(nColumns);
@@ -61,6 +72,7 @@ private void readBlock() throws IOException {
6172
currentBlock.add(values);
6273
}
6374
blockRowIndex = 0;
75+
return true;
6476
}
6577

6678
@Override

client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryFormatReader.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.clickhouse.client.api.query.QuerySettings;
66
import com.clickhouse.data.ClickHouseColumn;
77

8+
import java.io.EOFException;
89
import java.io.IOException;
910
import java.io.InputStream;
1011
import java.util.Map;
@@ -17,13 +18,27 @@ public RowBinaryFormatReader(InputStream inputStream, TableSchema schema) {
1718

1819
public RowBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema) {
1920
super(inputStream, querySettings, schema);
21+
readNextRecord();
2022
}
2123

2224
@Override
23-
public void readRecord(Map<String, Object> record) throws IOException {
25+
public boolean readRecord(Map<String, Object> record) throws IOException {
26+
boolean firstColumn = true;
2427
for (ClickHouseColumn column : getSchema().getColumns()) {
25-
record.put(column.getColumnName(), binaryStreamReader
26-
.readValue(column));
28+
try {
29+
Object val = binaryStreamReader.readValue(column);
30+
if (val != null) {
31+
record.put(column.getColumnName(),val);
32+
}
33+
firstColumn = false;
34+
} catch (EOFException e) {
35+
if (firstColumn) {
36+
endReached();
37+
return false;
38+
}
39+
throw e;
40+
}
2741
}
42+
return true;
2843
}
2944
}

client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryWithNamesAndTypesFormatReader.java

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,13 @@ private void readSchema() {
3030
try {
3131
TableSchema headerSchema = new TableSchema();
3232
List<String> columns = new ArrayList<>();
33-
int nCol = BinaryStreamReader.readVarInt(input);
33+
int nCol;
34+
try {
35+
nCol = BinaryStreamReader.readVarInt(input);
36+
} catch (EOFException e) {
37+
endReached();
38+
return;
39+
}
3440
for (int i = 0; i < nCol; i++) {
3541
columns.add(BinaryStreamReader.readString(input));
3642
}
@@ -44,21 +50,4 @@ private void readSchema() {
4450
throw new ClientException("Failed to read header", e);
4551
}
4652
}
47-
48-
/**
49-
* Reads a row to a map using column definitions from the schema.
50-
* If column type mismatch and cannot be converted, an exception will be thrown.
51-
*
52-
* @param record data destination
53-
* @throws IOException
54-
*/
55-
@Override
56-
public void readRecord(Map<String, Object> record) throws IOException {
57-
for (ClickHouseColumn column : getSchema().getColumns()) {
58-
Object val = binaryStreamReader.readValue(column);
59-
if (val != null) {
60-
record.put(column.getColumnName(),val);
61-
}
62-
}
63-
}
6453
}

client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryWithNamesFormatReader.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.clickhouse.client.api.query.QuerySettings;
77
import com.clickhouse.data.ClickHouseColumn;
88

9+
import java.io.EOFException;
910
import java.io.IOException;
1011
import java.io.InputStream;
1112
import java.util.ArrayList;
@@ -19,28 +20,34 @@ public class RowBinaryWithNamesFormatReader extends AbstractBinaryFormatReader {
1920

2021
public RowBinaryWithNamesFormatReader(InputStream inputStream, TableSchema schema) {
2122
this(inputStream, null, schema);
23+
readNextRecord();
2224
}
2325

2426
public RowBinaryWithNamesFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema) {
2527
super(inputStream, querySettings, schema);
26-
}
28+
int nCol = 0;
29+
try {
30+
nCol = BinaryStreamReader.readVarInt(input);
31+
} catch (EOFException e) {
32+
endReached();
33+
columns = Collections.emptyList();
34+
} catch (IOException e) {
35+
throw new RuntimeException("Failed to read header", e);
36+
}
2737

28-
@Override
29-
public void readRecord(Map<String, Object> record) throws IOException {
30-
if (columns == null) {
31-
columns = new ArrayList<>();
32-
int nCol = BinaryStreamReader.readVarInt(input);
33-
for (int i = 0; i < nCol; i++) {
34-
columns.add(BinaryStreamReader.readString(input));
38+
if (nCol > 0) {
39+
columns = new ArrayList<>(nCol);
40+
try {
41+
for (int i = 0; i < nCol; i++) {
42+
columns.add(BinaryStreamReader.readString(input));
43+
}
44+
} catch (IOException e) {
45+
throw new RuntimeException("Failed to read header", e);
3546
}
3647

3748
columns = Collections.unmodifiableList(columns);
3849
}
39-
40-
for (ClickHouseColumn column : getSchema().getColumns()) {
41-
record.put(column.getColumnName(), binaryStreamReader
42-
.readValue(column));
43-
}
50+
readNextRecord();
4451
}
4552

4653
public List<String> getColumns() {

client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java

Lines changed: 57 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryForm
4848

4949
private TableSchema schema;
5050

51-
protected volatile boolean hasNext = true;
51+
private volatile boolean hasNext = true;
5252

5353
protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema) {
5454
this.input = inputStream;
@@ -58,8 +58,28 @@ protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings quer
5858
}
5959

6060
protected Map<String, Object> currentRecord = new ConcurrentHashMap<>();
61+
protected Map<String, Object> nextRecord = new ConcurrentHashMap<>();
6162

62-
protected abstract void readRecord(Map<String, Object> record) throws IOException;
63+
64+
protected boolean readRecord(Map<String, Object> record) throws IOException {
65+
boolean firstColumn = true;
66+
for (ClickHouseColumn column : getSchema().getColumns()) {
67+
try {
68+
Object val = binaryStreamReader.readValue(column);
69+
if (val != null) {
70+
record.put(column.getColumnName(),val);
71+
}
72+
firstColumn = false;
73+
} catch (EOFException e) {
74+
if (firstColumn) {
75+
endReached();
76+
return false;
77+
}
78+
throw e;
79+
}
80+
}
81+
return true;
82+
}
6383

6484
@Override
6585
public <T> T readValue(int colIndex) {
@@ -77,37 +97,55 @@ public <T> T readValue(String colName) {
7797

7898
@Override
7999
public boolean hasNext() {
80-
if (hasNext) {
81-
try {
82-
hasNext = input.available() > 0;
83-
return hasNext;
84-
} catch (IOException e) {
100+
return hasNext;
101+
}
102+
103+
104+
protected void readNextRecord() {
105+
try {
106+
nextRecord.clear();
107+
if (!readRecord(nextRecord)) {
85108
hasNext = false;
86-
LOG.error("Failed to check if there is more data available", e);
87-
return false;
88109
}
110+
} catch (IOException e) {
111+
hasNext = false;
112+
throw new ClientException("Failed to read next row", e);
89113
}
90-
return false;
91114
}
92115

93116
@Override
94117
public Map<String, Object> next() {
95118
if (!hasNext) {
96-
throw new NoSuchElementException();
119+
return null;
97120
}
98121

99-
try {
100-
readRecord(currentRecord);
122+
if (!nextRecord.isEmpty()) {
123+
Map<String, Object> tmp = currentRecord;
124+
currentRecord = nextRecord;
125+
nextRecord = tmp;
126+
readNextRecord();
101127
return currentRecord;
102-
} catch (EOFException e) {
103-
hasNext = false;
104-
return null;
105-
} catch (IOException e) {
106-
hasNext = false;
107-
throw new ClientException("Failed to read row", e);
128+
} else {
129+
try {
130+
currentRecord.clear();
131+
if (readRecord(currentRecord)) {
132+
readNextRecord();
133+
return currentRecord;
134+
} else {
135+
currentRecord = null;
136+
return null;
137+
}
138+
} catch (IOException e) {
139+
hasNext = false;
140+
throw new ClientException("Failed to read row", e);
141+
}
108142
}
109143
}
110144

145+
protected void endReached() {
146+
hasNext = false;
147+
}
148+
111149
protected void setSchema(TableSchema schema) {
112150
this.schema = schema;
113151
}

0 commit comments

Comments
 (0)