Skip to content

Commit 96102c9

Browse files
committed
implemented reading in rowbinary format for json and dynamic
1 parent f0645b1 commit 96102c9

File tree

2 files changed

+76
-11
lines changed

2 files changed

+76
-11
lines changed

client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/BinaryStreamReader.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.time.ZonedDateTime;
2323
import java.util.ArrayList;
2424
import java.util.Collections;
25+
import java.util.HashMap;
2526
import java.util.LinkedHashMap;
2627
import java.util.List;
2728
import java.util.Map;
@@ -92,7 +93,7 @@ public <T> T readValue(ClickHouseColumn column, Class<?> typeHint) throws IOExce
9293
}
9394
}
9495

95-
ClickHouseDataType dataType = column.getDataType() == ClickHouseDataType.Dynamic ? readDynamicValueDataType() : column.getDataType();
96+
ClickHouseDataType dataType = column.getDataType() == ClickHouseDataType.Dynamic ? readDynamicData() : column.getDataType();
9697
int estimatedLen = column.getEstimatedLength();
9798
int precision = column.getPrecision();
9899
int scale = column.getScale();
@@ -199,7 +200,7 @@ public <T> T readValue(ClickHouseColumn column, Class<?> typeHint) throws IOExce
199200
if (jsonAsString) {
200201
return (T) readString(input);
201202
} else {
202-
throw new RuntimeException("Reading JSON from binary is not implemented yet");
203+
return (T) readJsonData(input);
203204
}
204205
// case Object: // deprecated https://clickhouse.com/docs/en/sql-reference/data-types/object-data-type
205206
case Array:
@@ -986,7 +987,7 @@ public byte[] allocate(int size) {
986987
}
987988
}
988989

989-
private ClickHouseDataType readDynamicValueDataType() throws IOException {
990+
private ClickHouseDataType readDynamicData() throws IOException {
990991
byte tag = readByte();
991992

992993
ClickHouseDataType type;
@@ -1005,4 +1006,21 @@ private ClickHouseDataType readDynamicValueDataType() throws IOException {
10051006

10061007
return type;
10071008
}
1009+
1010+
private static final ClickHouseColumn JSON_PLACEHOLDER_COL = ClickHouseColumn.parse("v Dynamic").get(0);
1011+
1012+
private Map<String, Object> readJsonData(InputStream input) throws IOException {
1013+
int numOfPaths = readVarInt(input);
1014+
if (numOfPaths == 0) {
1015+
return Collections.emptyMap();
1016+
}
1017+
1018+
Map<String, Object> obj = new HashMap<>();
1019+
for (int i = 0; i < numOfPaths; i++) {
1020+
String path = readString(input);
1021+
Object value = readValue(JSON_PLACEHOLDER_COL);
1022+
obj.put(path, value);
1023+
}
1024+
return obj;
1025+
}
10081026
}

client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,11 @@
8080
import java.util.concurrent.Executors;
8181
import java.util.concurrent.Future;
8282
import java.util.concurrent.TimeUnit;
83+
import java.util.concurrent.atomic.AtomicInteger;
8384
import java.util.function.Consumer;
8485
import java.util.function.Function;
8586
import java.util.function.Supplier;
87+
import java.util.regex.Matcher;
8688
import java.util.stream.BaseStream;
8789
import java.util.stream.Collectors;
8890
import java.util.stream.IntStream;
@@ -1420,7 +1422,10 @@ private List<Map<String, Object>> prepareDataSet(String table, List<String> colu
14201422
}
14211423
createStmtBuilder.setLength(createStmtBuilder.length() - 2);
14221424
createStmtBuilder.append(") ENGINE = MergeTree ORDER BY tuple()");
1423-
client.execute(createStmtBuilder.toString()).get(10, TimeUnit.SECONDS);
1425+
client.execute(createStmtBuilder.toString(), (CommandSettings)
1426+
new CommandSettings().serverSetting("enable_dynamic_type", "1")
1427+
.serverSetting("allow_experimental_json_type", "1"))
1428+
.get(10, TimeUnit.SECONDS);
14241429

14251430
// Insert data
14261431
StringBuilder insertStmtBuilder = new StringBuilder();
@@ -2058,17 +2063,59 @@ public void testGettingRowsBeforeLimit() throws Exception {
20582063
@Test(groups = {"integration"})
20592064
public void testGetDynamicValue() throws Exception {
20602065
String table = "test_get_dynamic_values";
2061-
client.execute("DROP TABLE IF EXISTS " + table);
2062-
client.execute("CREATE TABLE " + table + " (rowId Int32, v Dynamic) Engine MergeTree ORDER BY ()", (CommandSettings) new CommandSettings().serverSetting("enable_dynamic_type", "1"));
20632066

2064-
client.execute("INSERT INTO " + table + " VALUES (0, 'string'), (1, 2222222)");
2067+
final AtomicInteger rowId = new AtomicInteger(-1);
2068+
final Random rnd = new Random();
2069+
2070+
List<Map<String,Object>> dataset = prepareDataSet(table, Arrays.asList("rowId Int32", "v Dynamic"),
2071+
Arrays.asList(s -> rowId.incrementAndGet(), s-> {
2072+
int decision = rnd.nextInt(3);
2073+
if (decision == 0) {
2074+
return RandomStringUtils.randomAlphanumeric(3, 10);
2075+
} else if (decision == 1) {
2076+
return rnd.nextInt();
2077+
} else {
2078+
return rnd.nextDouble();
2079+
}
2080+
}), 1000);
20652081

20662082
try (QueryResponse response = client.query("SELECT * FROM " + table).get()) {
20672083
ClickHouseBinaryFormatReader reader = client.newBinaryFormatReader(response);
2068-
reader.next();
2069-
System.out.println(reader.getString("v"));
2070-
reader.next();
2071-
System.out.println(reader.getString("v"));
2084+
while (reader.next() != null) {
2085+
int rowIndex = reader.getInteger("rowId");
2086+
Assert.assertEquals(reader.getString("v"), dataset.get(rowIndex).get("v").toString());
2087+
}
2088+
}
2089+
}
2090+
2091+
@Test(groups = {"integration"})
2092+
public void testGetJSON() throws Exception {
2093+
String table = "test_get_json_values";
2094+
2095+
final AtomicInteger rowId = new AtomicInteger(-1);
2096+
final Random rnd = new Random();
2097+
2098+
List<Map<String,Object>> dataset = prepareDataSet(table, Arrays.asList("rowId Int32", "v1 JSON"),
2099+
Arrays.asList(s -> rowId.incrementAndGet(),
2100+
s-> {
2101+
String a = "{'a': '" + RandomStringUtils.randomAlphabetic(20) + "', 'b': { 'c': 'test1', 'd': " + rnd
2102+
.nextInt(1000) + "}}";
2103+
return a.replaceAll("'", "\"");
2104+
}), 1);
2105+
2106+
System.out.println(dataset);
2107+
ObjectMapper jackson = new ObjectMapper();
2108+
try (QueryResponse response = client.query("SELECT * FROM " + table).get()) {
2109+
ClickHouseBinaryFormatReader reader = client.newBinaryFormatReader(response);
2110+
while (reader.next() != null) {
2111+
int rowIndex = reader.getInteger("rowId");
2112+
JsonNode expected = jackson.readValue(dataset.get(rowIndex).get("v1").toString(), JsonNode.class);
2113+
Map<String, Object> v1 = reader.readValue("v1");
2114+
for (Map.Entry<String, Object> e : v1.entrySet()) {
2115+
String pointer = "/" + e.getKey().replaceAll("\\.", "/");
2116+
Assert.assertEquals(e.getValue().toString(), expected.at(pointer).asText());
2117+
}
2118+
}
20722119
}
20732120
}
20742121
}

0 commit comments

Comments
 (0)