Skip to content

Commit 18d0ca1

Browse files
authored
Merge pull request #1779 from ClickHouse/fix_testing_client_v2
improved queryAll() to use less mem
2 parents 762d78a + d7ed4a5 commit 18d0ca1

File tree

7 files changed

+115
-33
lines changed

7 files changed

+115
-33
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import java.util.Collections;
6868
import java.util.HashMap;
6969
import java.util.HashSet;
70+
import java.util.LinkedHashMap;
7071
import java.util.List;
7172
import java.util.Map;
7273
import java.util.Set;
@@ -1421,10 +1422,11 @@ public List<GenericRecord> queryAll(String sqlQuery) {
14211422
query(sqlQuery, settings).get(operationTimeout, TimeUnit.MILLISECONDS)) {
14221423
List<GenericRecord> records = new ArrayList<>();
14231424
if (response.getResultRows() > 0) {
1424-
ClickHouseBinaryFormatReader reader =
1425+
RowBinaryWithNamesAndTypesFormatReader reader =
14251426
new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings());
1427+
14261428
Map<String, Object> record;
1427-
while ((record = reader.next()) != null) {
1429+
while (reader.readRecord((record = new LinkedHashMap<>()))) {
14281430
records.add(new MapBackedRecord(record, reader.getSchema()));
14291431
}
14301432
}

client-v2/src/main/java/com/clickhouse/client/api/data_formats/NativeFormatReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public NativeFormatReader(InputStream inputStream, QuerySettings settings) {
3333
}
3434

3535
@Override
36-
protected boolean readRecord(Map<String, Object> record) throws IOException {
36+
public boolean readRecord(Map<String, Object> record) throws IOException {
3737
if (currentBlock == null || blockRowIndex >= currentBlock.getnRows()) {
3838
if (!readBlock()) {
3939
return false;

client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.time.ZoneOffset;
3131
import java.time.ZonedDateTime;
3232
import java.time.temporal.ChronoUnit;
33+
import java.util.ArrayList;
3334
import java.util.Collections;
3435
import java.util.HashMap;
3536
import java.util.List;
@@ -38,6 +39,7 @@
3839
import java.util.TimeZone;
3940
import java.util.UUID;
4041
import java.util.concurrent.ConcurrentHashMap;
42+
import java.util.function.Function;
4143

4244
public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryFormatReader {
4345

@@ -70,7 +72,18 @@ protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings quer
7072
protected Map<String, Object> nextRecord = new ConcurrentHashMap<>();
7173

7274

73-
protected boolean readRecord(Map<String, Object> record) throws IOException {
75+
/**
76+
* It is still internal method and should be used with care.
77+
* Usually this method is called to read next record into internal object and affects hasNext() method.
78+
* So after calling this one:
79+
* - hasNext(), next() should not be called
80+
* - stream should be read with readRecord() method fully
81+
*
82+
* @param record
83+
* @return
84+
* @throws IOException
85+
*/
86+
public boolean readRecord(Map<String, Object> record) throws IOException {
7487
boolean firstColumn = true;
7588
for (ClickHouseColumn column : getSchema().getColumns()) {
7689
try {

client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/BinaryReaderBackedRecord.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,4 +346,14 @@ public LocalDateTime getLocalDateTime(String colName) {
346346
public LocalDateTime getLocalDateTime(int index) {
347347
return reader.getLocalDateTime(index);
348348
}
349+
350+
@Override
351+
public Object getObject(String colName) {
352+
return reader.readValue(colName);
353+
}
354+
355+
@Override
356+
public Object getObject(int index) {
357+
return reader.readValue(index);
358+
}
349359
}

client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/MapBackedRecord.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,4 +480,14 @@ public LocalDateTime getLocalDateTime(String colName) {
480480
public LocalDateTime getLocalDateTime(int index) {
481481
return readValue(index);
482482
}
483+
484+
@Override
485+
public Object getObject(String colName) {
486+
return readValue(colName);
487+
}
488+
489+
@Override
490+
public Object getObject(int index) {
491+
return readValue(index);
492+
}
483493
}

client-v2/src/main/java/com/clickhouse/client/api/query/GenericRecord.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
public interface GenericRecord {
2121

22-
2322
/**
2423
* Reads column with name `colName` as a string.
2524
*
@@ -486,4 +485,8 @@ public interface GenericRecord {
486485
LocalDateTime getLocalDateTime(String colName);
487486

488487
LocalDateTime getLocalDateTime(int index);
488+
489+
Object getObject(String colName);
490+
491+
Object getObject(int index);
489492
}

client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java

Lines changed: 72 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@
1414
import com.clickhouse.client.api.ClientException;
1515
import com.clickhouse.client.api.DataTypeUtils;
1616
import com.clickhouse.client.api.ServerException;
17-
import com.clickhouse.client.api.enums.Protocol;
1817
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
1918
import com.clickhouse.client.api.data_formats.NativeFormatReader;
2019
import com.clickhouse.client.api.data_formats.RowBinaryFormatReader;
2120
import com.clickhouse.client.api.data_formats.RowBinaryWithNamesAndTypesFormatReader;
2221
import com.clickhouse.client.api.data_formats.RowBinaryWithNamesFormatReader;
22+
import com.clickhouse.client.api.enums.Protocol;
2323
import com.clickhouse.client.api.insert.InsertSettings;
2424
import com.clickhouse.client.api.metadata.TableSchema;
2525
import com.clickhouse.client.api.metrics.ClientMetrics;
@@ -52,19 +52,17 @@
5252
import java.math.BigInteger;
5353
import java.time.LocalDate;
5454
import java.time.LocalDateTime;
55-
import java.time.OffsetDateTime;
5655
import java.time.ZoneId;
5756
import java.time.ZonedDateTime;
58-
import java.time.temporal.ChronoUnit;
5957
import java.util.ArrayList;
6058
import java.util.Arrays;
6159
import java.util.HashMap;
60+
import java.util.HashSet;
6261
import java.util.Iterator;
6362
import java.util.List;
6463
import java.util.Map;
65-
import java.util.NoSuchElementException;
66-
import java.util.Properties;
6764
import java.util.Random;
65+
import java.util.Set;
6866
import java.util.UUID;
6967
import java.util.concurrent.CountDownLatch;
7068
import java.util.concurrent.ExecutionException;
@@ -77,6 +75,7 @@
7775
import java.util.function.Supplier;
7876
import java.util.stream.BaseStream;
7977
import java.util.stream.IntStream;
78+
import java.util.stream.Collectors;
8079

8180
public class QueryTests extends BaseIntegrationTest {
8281

@@ -150,16 +149,19 @@ public void testSimpleQueryWithTSV() {
150149

151150
@Test(groups = {"integration"})
152151
public void testReadRecords() throws Exception {
153-
prepareDataSet(DATASET_TABLE, DATASET_COLUMNS, DATASET_VALUE_GENERATORS, 10);
152+
List<Map<String, Object>> dataset = prepareDataSet(DATASET_TABLE, DATASET_COLUMNS, DATASET_VALUE_GENERATORS, 10);
154153

155154
Records records = client.queryRecords("SELECT * FROM " + DATASET_TABLE).get(3, TimeUnit.SECONDS);
156155
Assert.assertEquals(records.getResultRows(), 10, "Unexpected number of rows");
156+
157+
Iterator<Map<String, Object>> dataIterator = dataset.iterator();
157158
for (GenericRecord record : records) {
158-
System.out.println(record.getLong(1)); // UInt32 column col1
159-
System.out.println(record.getInteger(2)); // Int32 column col2
160-
System.out.println(record.getString(3)); // string column col3
161-
System.out.println(record.getLong(4)); // Int64 column col4
162-
System.out.println(record.getString(5)); // string column col5
159+
Map<String,Object> dsRecords = dataIterator.next();
160+
Assert.assertEquals(record.getLong("col1"), dsRecords.get("col1"));
161+
Assert.assertEquals(record.getInteger("col2"), dsRecords.get("col2"));
162+
Assert.assertEquals(record.getString("col3"), dsRecords.get("col3"));
163+
Assert.assertEquals(record.getLong("col4"), dsRecords.get("col4"));
164+
Assert.assertEquals(record.getString("col5"), dsRecords.get("col5"));
163165
}
164166
}
165167

@@ -179,6 +181,33 @@ public void testBigUnsignedInt() throws Exception {
179181
Assert.assertEquals(firstRecord.getBigInteger("i256"), expected256);
180182
}
181183

184+
@Test(groups = {"integration"})
185+
public void testReadRecordsWithStreamAPI() throws Exception {
186+
final int tables = 10;
187+
188+
Set<String> expectedTableNames = new HashSet<>();
189+
for (int i = 0; i < tables; i++) {
190+
final String tableName = "a_" + i;
191+
expectedTableNames.add(tableName);
192+
client.execute("DROP TABLE IF EXISTS default." + tableName);
193+
client.execute("CREATE TABLE " + tableName +" (x UInt32) ENGINE = Memory");
194+
}
195+
196+
Records records = client.queryRecords("SHOW TABLES").get(3, TimeUnit.SECONDS);
197+
198+
HashSet<String> tableNames = new HashSet<>();
199+
records.forEach(r -> {
200+
tableNames.add(r.getString(1));
201+
});
202+
Assert.assertTrue(tableNames.containsAll(expectedTableNames));
203+
204+
Assert.expectThrows(IllegalStateException.class, () -> {
205+
records.forEach(r -> {
206+
System.out.println(r);
207+
});
208+
});
209+
}
210+
182211
@Test(groups = {"integration"})
183212
public void testReadRecordsGetFirstRecord() throws Exception {
184213
prepareDataSet(DATASET_TABLE, DATASET_COLUMNS, DATASET_VALUE_GENERATORS, 10);
@@ -200,12 +229,20 @@ public void testReadRecordsNoResult() throws Exception {
200229

201230
@Test(groups = {"integration"})
202231
public void testQueryAll() throws Exception {
203-
testQueryAll(10);
204-
}
205-
public void testQueryAll(int numberOfRecords) throws Exception {
206-
prepareDataSet(DATASET_TABLE, DATASET_COLUMNS, DATASET_VALUE_GENERATORS, numberOfRecords);
207-
GenericRecord hostnameRecord = client.queryAll("SELECT hostname()").stream().findFirst().get();
208-
Assert.assertNotNull(hostnameRecord);
232+
List<Map<String, Object>> dataset = prepareDataSet(DATASET_TABLE, DATASET_COLUMNS, DATASET_VALUE_GENERATORS, 10);
233+
List<GenericRecord> records = client.queryAll("SELECT * FROM " + DATASET_TABLE + " LIMIT " + dataset.size());
234+
Assert.assertFalse(records.isEmpty());
235+
236+
for (String colDefinition : DATASET_COLUMNS) {
237+
// result values
238+
String colName = colDefinition.split(" ")[0];
239+
List<Object> colValues = records.stream().map(r -> r.getObject(colName)).collect(Collectors.toList());
240+
Assert.assertEquals(colValues.size(), dataset.size());
241+
242+
// dataset values
243+
List<Object> dataValue = dataset.stream().map(d -> d.get(colName)).collect(Collectors.toList());
244+
Assert.assertEquals(colValues, dataValue, "Failed for column " + colName);
245+
}
209246
}
210247

211248
@Test(groups = {"integration"})
@@ -223,6 +260,24 @@ public void testQueryAllNoResult() throws Exception {
223260
Assert.assertTrue(records.isEmpty());
224261
}
225262

263+
@Test
264+
public void testQueryAllTableNames() {
265+
final int tables = 10;
266+
Set<String> expectedTableNames = new HashSet<>();
267+
for (int i = 0; i < tables; i++) {
268+
final String tableName = "a_" + i;
269+
expectedTableNames.add(tableName);
270+
client.execute("DROP TABLE IF EXISTS default." + tableName);
271+
client.execute("CREATE TABLE " + tableName +" (x UInt32) ENGINE = Memory");
272+
}
273+
274+
List<GenericRecord> records = client.queryAll("SHOW TABLES");
275+
Assert.assertTrue(records.size() >= tables);
276+
277+
Set<String> tableNames = records.stream().map(r -> r.getString(1)).collect(Collectors.toSet());
278+
Assert.assertTrue(tableNames.containsAll(expectedTableNames));
279+
}
280+
226281
@Test(groups = {"integration"})
227282
public void testQueryJSON() throws ExecutionException, InterruptedException {
228283
Map<String, Object> datasetRecord = prepareSimpleDataSet();
@@ -975,7 +1030,6 @@ public void testDataTypes(List<String> columns, List<Supplier<String>> valueGene
9751030
colIndex++;
9761031
try {
9771032
verifier.accept(reader);
978-
System.out.println("Verified " + colIndex);
9791033
} catch (Exception e) {
9801034
Assert.fail("Failed to verify " + columns.get(colIndex), e);
9811035
}
@@ -1000,8 +1054,6 @@ public void testQueryMetrics() throws Exception {
10001054

10011055
// Stats should be available after the query is done
10021056
OperationMetrics metrics = response.getMetrics();
1003-
System.out.println("Server read rows: " + metrics.getMetric(ServerMetrics.NUM_ROWS_READ).getLong());
1004-
System.out.println("Client stats: " + metrics.getMetric(ClientMetrics.OP_DURATION).getLong());
10051057

10061058
Assert.assertEquals(metrics.getMetric(ServerMetrics.NUM_ROWS_READ).getLong(), 10); // 10 rows in the table
10071059
Assert.assertEquals(metrics.getMetric(ServerMetrics.RESULT_ROWS).getLong(), 3);
@@ -1018,8 +1070,6 @@ public void testQueryMetrics() throws Exception {
10181070
response = client.query(insertStmtBuilder.toString(), settings).get();
10191071

10201072
metrics = response.getMetrics();
1021-
System.out.println("Server read rows: " + metrics.getMetric(ServerMetrics.NUM_ROWS_READ).getLong());
1022-
System.out.println("Client stats: " + metrics.getMetric(ClientMetrics.OP_DURATION).getLong());
10231073

10241074
Assert.assertEquals(metrics.getMetric(ServerMetrics.NUM_ROWS_READ).getLong(), rowsToInsert); // 10 rows in the table
10251075
Assert.assertEquals(metrics.getMetric(ServerMetrics.RESULT_ROWS).getLong(), rowsToInsert);
@@ -1087,7 +1137,6 @@ private List<Map<String, Object>> prepareDataSet(String table, List<String> colu
10871137
insertStmtBuilder.append("), ");
10881138
data.add(values);
10891139
}
1090-
System.out.println("Insert statement: " + insertStmtBuilder);
10911140
request = client.write(getServer(ClickHouseProtocol.HTTP))
10921141
.query(insertStmtBuilder.toString());
10931142
try (ClickHouseResponse response = request.executeAndWait()) {}
@@ -1232,9 +1281,7 @@ public void testServerTimeZoneFromHeader() {
12321281
reader.next();
12331282

12341283
LocalDateTime serverTime = reader.getLocalDateTime(1);
1235-
System.out.println("Server time: " + serverTime);
12361284
LocalDateTime serverUtcTime = reader.getLocalDateTime(2);
1237-
System.out.println("Server UTC time: " + serverUtcTime);
12381285

12391286
ZonedDateTime serverTimeZ = serverTime.atZone(ZoneId.of(requestTimeZone));
12401287
ZonedDateTime serverUtcTimeZ = serverUtcTime.atZone(ZoneId.of("UTC"));
@@ -1265,11 +1312,8 @@ public void testClientUseOwnTimeZone() {
12651312
reader.next();
12661313

12671314
LocalDateTime serverTime = reader.getLocalDateTime(1); // in "America/Los_Angeles"
1268-
System.out.println("Server time: " + serverTime);
12691315
LocalDateTime serverUtcTime = reader.getLocalDateTime(2);
1270-
System.out.println("Server UTC time: " + serverUtcTime);
12711316
ZonedDateTime serverLisbonTime = reader.getZonedDateTime(3); // in "Europe/Lisbon"
1272-
System.out.println("Server Lisbon time: " + serverLisbonTime);
12731317

12741318
ZonedDateTime serverTimeZ = serverTime.atZone(ZoneId.of("America/Los_Angeles"));
12751319
ZonedDateTime serverUtcTimeZ = serverUtcTime.atZone(ZoneId.of("UTC"));

0 commit comments

Comments
 (0)