Skip to content

Commit 9045eda

Browse files
committed
fixed serialization of null values
1 parent 28de7cd commit 9045eda

File tree

6 files changed

+300
-59
lines changed

6 files changed

+300
-59
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 78 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import com.clickhouse.client.http.ClickHouseHttpProto;
4343
import com.clickhouse.client.http.config.ClickHouseHttpOption;
4444
import com.clickhouse.data.ClickHouseColumn;
45+
import com.clickhouse.data.ClickHouseDataType;
4546
import com.clickhouse.data.ClickHouseFormat;
4647
import com.clickhouse.data.format.BinaryStreamUtils;
4748
import org.apache.hc.client5.http.ConnectTimeoutException;
@@ -50,6 +51,7 @@
5051
import org.apache.hc.core5.http.ConnectionRequestTimeoutException;
5152
import org.apache.hc.core5.http.HttpStatus;
5253
import org.apache.hc.core5.http.NoHttpResponseException;
54+
import org.objectweb.asm.Type;
5355
import org.slf4j.Logger;
5456
import org.slf4j.LoggerFactory;
5557

@@ -106,9 +108,9 @@
106108
*
107109
* if (client.ping()) {
108110
* QuerySettings settings = new QuerySettings().setFormat(ClickHouseFormat.RowBinaryWithNamesAndTypes);
109-
* Future<QueryResponse> response = client.query("SELECT * FROM " + table, settings);
110-
* QueryResponse queryResponse = response.get();
111-
* }
111+
* try (QueryResponse response = client.query("SELECT * FROM " + table, settings).get(10, TimeUnit.SECONDS)) {
112+
* ...
113+
* }
112114
* }
113115
* </pre>
114116
*
@@ -122,7 +124,7 @@ public class Client implements AutoCloseable {
122124
private final Set<String> endpoints;
123125
private final Map<String, String> configuration;
124126
private final List<ClickHouseNode> serverNodes = new ArrayList<>();
125-
private final Map<Class<?>, List<POJOSerializer>> serializers; //Order is important to preserve for RowBinary
127+
private final Map<Class<?>, Map<String, POJOSerializer>> serializers;
126128
private final Map<Class<?>, Map<String, Method>> getterMethods;
127129

128130
private final Map<Class<?>, Map<String, POJOSetter>> deserializers;
@@ -138,6 +140,8 @@ public class Client implements AutoCloseable {
138140

139141
private ClickHouseClient oldClient = null;
140142

143+
private Map<String, TableSchema> tableSchemaCache = new ConcurrentHashMap<>();
144+
141145
private Client(Set<String> endpoints, Map<String,String> configuration, boolean useNewImplementation,
142146
ExecutorService sharedOperationExecutor) {
143147
this.endpoints = endpoints;
@@ -929,12 +933,15 @@ public boolean ping(long timeout) {
929933

930934
/**
931935
* <p>Registers a POJO class and maps its fields to a table schema</p>
936+
* <p>Note: table schema will be stored in cache to be used while other operations. Call this method
937+
* to update cache.</p>
932938
*
933939
* @param clazz - class of a POJO
934940
* @param schema - correlating table schema
935941
*/
936942
public synchronized void register(Class<?> clazz, TableSchema schema) {
937943
LOG.debug("Registering POJO: {}", clazz.getName());
944+
tableSchemaCache.put(schema.getTableName(), schema);
938945

939946
//Create a new POJOSerializer with static .serialize(object, columns) methods
940947
Map<String, Method> classGetters = new HashMap<>();
@@ -956,34 +963,49 @@ public synchronized void register(Class<?> clazz, TableSchema schema) {
956963
this.setterMethods.put(clazz, classSetters);//Store the setter methods for later use
957964
this.hasDefaults.put(clazz, schema.hasDefaults());
958965

959-
List<POJOSerializer> classSerializers = new ArrayList<>();
966+
Map<String, POJOSerializer> classSerializers = new HashMap<>();
960967
Map<String, POJOSetter> classDeserializers = new ConcurrentHashMap<>();
961968
for (ClickHouseColumn column : schema.getColumns()) {
962969
String propertyName = column.getColumnName().toLowerCase().replace("_", "").replace(".", "");
963970

964971
Method getterMethod = classGetters.get(propertyName);
965-
boolean classHashDefaults = this.hasDefaults.get(clazz);
972+
boolean defaultsSupport = this.hasDefaults.get(clazz);
966973
if (getterMethod != null) {
967-
classSerializers.add((obj, stream) -> {
974+
classSerializers.put(schema.getTableName() + "." + column.getColumnName(), (obj, stream) -> {
968975
Object value = getterMethod.invoke(obj);
969976

970-
//Handle null values
971-
if (value == null) {
972-
if (classHashDefaults && !column.hasDefault()) {//Send this only if there is no default
977+
if (defaultsSupport) {
978+
if (value != null) {//Because we now support defaults, we have to send nonNull
979+
BinaryStreamUtils.writeNonNull(stream);//Write 0 for no default
980+
981+
if (column.isNullable()) {//If the column is nullable
982+
BinaryStreamUtils.writeNonNull(stream);//Write 0 for not null
983+
}
984+
} else {//So if the object is null
985+
if (column.hasDefault()) {
986+
BinaryStreamUtils.writeNull(stream);//Send 1 for default
987+
return;
988+
} else if (column.isNullable()) {//And the column is nullable
989+
BinaryStreamUtils.writeNonNull(stream);
990+
BinaryStreamUtils.writeNull(stream);//Then we send null, write 1
991+
return;//And we're done
992+
} else if (column.getDataType() == ClickHouseDataType.Array) {//If the column is an array
993+
BinaryStreamUtils.writeNonNull(stream);//Then we send nonNull
994+
} else {
995+
throw new IllegalArgumentException(String.format("An attempt to write null into not nullable column '%s'", column.getColumnName()));
996+
}
997+
}
998+
} else {
999+
// If column is nullable && the object is also null add the not null marker
1000+
if (column.isNullable() && value != null) {
9731001
BinaryStreamUtils.writeNonNull(stream);
9741002
}
975-
BinaryStreamUtils.writeNull(stream);//We send this regardless of default or nullable
976-
return;
977-
}
978-
979-
//Handle default
980-
if (classHashDefaults) {
981-
BinaryStreamUtils.writeNonNull(stream);//Write 0
982-
}
983-
984-
//Handle nullable
985-
if (column.isNullable()) {
986-
BinaryStreamUtils.writeNonNull(stream);//Write 0
1003+
if (!column.isNullable() && value == null) {
1004+
if (column.getDataType() == ClickHouseDataType.Array)
1005+
BinaryStreamUtils.writeNonNull(stream);
1006+
else
1007+
throw new IllegalArgumentException(String.format("An attempt to write null into not nullable column '%s'", column.getColumnName()));
1008+
}
9871009
}
9881010

9891011
//Handle the different types
@@ -993,6 +1015,7 @@ public synchronized void register(Class<?> clazz, TableSchema schema) {
9931015
LOG.warn("No getter method found for column: {}", propertyName);
9941016
}
9951017

1018+
// Deserialization stuff
9961019
Method setterMethod = classSetters.get(propertyName);
9971020
String columnName = column.getColumnName();
9981021
if (setterMethod != null) {
@@ -1051,13 +1074,6 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
10511074
throw new IllegalArgumentException("Data cannot be empty");
10521075
}
10531076

1054-
//Lookup the Serializer for the POJO
1055-
List<POJOSerializer> serializers = this.serializers.get(data.get(0).getClass());
1056-
if (serializers == null || serializers.isEmpty()) {
1057-
throw new IllegalArgumentException("No serializers found for class '" + data.get(0).getClass() + "'. Did you forget to register it?");
1058-
} else {
1059-
LOG.info("serializers: {}", serializers.size());
1060-
}
10611077

10621078
String operationId = startOperation();
10631079
settings.setOperationId(operationId);
@@ -1073,6 +1089,22 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
10731089

10741090
boolean hasDefaults = this.hasDefaults.get(data.get(0).getClass());
10751091
ClickHouseFormat format = hasDefaults? ClickHouseFormat.RowBinaryWithDefaults : ClickHouseFormat.RowBinary;
1092+
TableSchema tableSchema = tableSchemaCache.get(tableName);
1093+
if (tableSchema == null) {
1094+
tableSchema = getTableSchema(tableName);
1095+
}
1096+
//Lookup the Serializer for the POJO
1097+
Map<String, POJOSerializer> classSerializers = serializers.getOrDefault(data.get(0).getClass(), Collections.emptyMap());
1098+
List<POJOSerializer> serializersForTable = new ArrayList<>();
1099+
for (ClickHouseColumn column : tableSchema.getColumns()) {
1100+
POJOSerializer serializer = classSerializers.get(tableName + "." + column.getColumnName());
1101+
if (serializer == null) {
1102+
throw new IllegalArgumentException("No serializer found for column '" + column.getColumnName() + "'. Did you forget to register it?");
1103+
}
1104+
System.out.println("Serializer: " + serializer.toString() + " for column: " + column.getColumnName());
1105+
serializersForTable.add(serializer);
1106+
}
1107+
10761108

10771109
if (useNewImplementation) {
10781110
String retry = configuration.get(ClickHouseClientOption.RETRY.getKey());
@@ -1096,7 +1128,9 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
10961128
out.write(format.name().getBytes());
10971129
out.write(" \n".getBytes());
10981130
for (Object obj : data) {
1099-
for (POJOSerializer serializer : serializers) {
1131+
1132+
for (POJOSerializer serializer : serializersForTable) {
1133+
System.out.println("Serializer: " + serializer.toString());
11001134
try {
11011135
serializer.serialize(obj, out);
11021136
} catch (InvocationTargetException | IllegalAccessException | IOException e) {
@@ -1145,7 +1179,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
11451179

11461180
//Call the static .serialize method on the POJOSerializer for each object in the list
11471181
for (Object obj : data) {
1148-
for (POJOSerializer serializer : serializers) {
1182+
for (POJOSerializer serializer : serializersForTable) {
11491183
try {
11501184
serializer.serialize(obj, stream);
11511185
} catch (InvocationTargetException | IllegalAccessException | IOException e) {
@@ -1595,13 +1629,12 @@ public <T> List<T> queryAll(String sqlQuery, Class<T> clazz, Supplier<T> allocat
15951629
* @return {@code TableSchema} - Schema of the table
15961630
*/
15971631
public TableSchema getTableSchema(String table) {
1598-
return getTableSchema(table, configuration.get("database"));
1632+
return getTableSchema(table, getDefaultDatabase());
15991633
}
16001634

16011635
/**
16021636
* <p>Fetches schema of a table and returns complete information about each column.
16031637
* Information includes column name, type, default value, etc.</p>
1604-
*
16051638
* <p>See {@link #register(Class, TableSchema)}</p>
16061639
*
16071640
* @param table - table name
@@ -1610,29 +1643,26 @@ public TableSchema getTableSchema(String table) {
16101643
*/
16111644
public TableSchema getTableSchema(String table, String database) {
16121645
final String sql = "DESCRIBE TABLE " + table + " FORMAT " + ClickHouseFormat.TSKV.name();
1613-
1614-
int operationTimeout = getOperationTimeout();
1615-
1616-
try (QueryResponse response = operationTimeout == 0 ? query(sql).get() :
1617-
query(sql).get(getOperationTimeout(), TimeUnit.SECONDS)) {
1618-
return new TableSchemaParser().readTSKV(response.getInputStream(), table, database);
1619-
} catch (TimeoutException e) {
1620-
throw new ClientException("Operation has likely timed out after " + getOperationTimeout() + " seconds.", e);
1621-
} catch (ExecutionException e) {
1622-
throw new ClientException("Failed to get table schema", e.getCause());
1623-
} catch (Exception e) {
1624-
throw new ClientException("Failed to get table schema", e);
1625-
}
1646+
return getTableSchemaImpl(sql, table, database);
16261647
}
16271648

1628-
public TableSchema getTableSchemaFromQuery(String sql, String name) {
1649+
/**
1650+
* <p>Creates table schema from a query.</p>
1651+
* <p>Note: this method will no cache table schema </p>
1652+
* @param sql - SQL query which schema to return
1653+
* @return
1654+
*/
1655+
public TableSchema getTableSchemaFromQuery(String sql) {
16291656
final String describeQuery = "DESC (" + sql + ") FORMAT " + ClickHouseFormat.TSKV.name();
1657+
return getTableSchemaImpl(describeQuery, UUID.randomUUID().toString(), getDefaultDatabase());
1658+
}
16301659

1660+
private TableSchema getTableSchemaImpl(String describeQuery, String name, String database) {
16311661
int operationTimeout = getOperationTimeout();
16321662

16331663
try (QueryResponse response = operationTimeout == 0 ? query(describeQuery).get() :
16341664
query(describeQuery).get(getOperationTimeout(), TimeUnit.SECONDS)) {
1635-
return new TableSchemaParser().readTSKV(response.getInputStream(), name, getDefaultDatabase());
1665+
return new TableSchemaParser().readTSKV(response.getInputStream(), name, database);
16361666
} catch (TimeoutException e) {
16371667
throw new ClientException("Operation has likely timed out after " + getOperationTimeout() + " seconds.", e);
16381668
} catch (ExecutionException e) {

client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/BinaryStreamReader.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import com.clickhouse.client.api.ClientException;
44
import com.clickhouse.data.ClickHouseColumn;
55
import com.clickhouse.data.ClickHouseDataType;
6+
import com.clickhouse.data.format.BinaryStreamUtils;
7+
import com.clickhouse.data.value.ClickHouseBitmap;
68
import org.slf4j.Logger;
79
import org.slf4j.helpers.NOPLogger;
810

@@ -195,7 +197,8 @@ private <T> T readValueImpl(ClickHouseColumn column) throws IOException {
195197
case Nothing:
196198
return null;
197199
// case SimpleAggregateFunction:
198-
// case AggregateFunction:
200+
case AggregateFunction:
201+
return (T) ClickHouseBitmap.deserialize(input, column.getNestedColumns().get(0).getDataType());
199202
default:
200203
throw new IllegalArgumentException("Unsupported data type: " + column.getDataType());
201204
}

client-v2/src/main/java/com/clickhouse/client/api/internal/SerializerUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.clickhouse.client.api.Client;
44
import com.clickhouse.client.api.ClientException;
55
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
6+
import com.clickhouse.client.api.insert.POJOSerializer;
67
import com.clickhouse.client.api.query.POJOSetter;
78
import com.clickhouse.data.ClickHouseAggregateFunction;
89
import com.clickhouse.data.ClickHouseColumn;

client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,16 @@
88
import com.clickhouse.client.ClickHouseNodeSelector;
99
import com.clickhouse.client.ClickHouseProtocol;
1010
import com.clickhouse.client.api.Client;
11+
import com.clickhouse.client.api.ClientException;
12+
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
1113
import com.clickhouse.client.api.enums.Protocol;
1214
import com.clickhouse.client.api.insert.InsertResponse;
1315
import com.clickhouse.client.api.insert.InsertSettings;
1416
import com.clickhouse.client.api.metrics.ClientMetrics;
1517
import com.clickhouse.client.api.metrics.OperationMetrics;
1618
import com.clickhouse.client.api.metrics.ServerMetrics;
1719
import com.clickhouse.client.api.query.GenericRecord;
20+
import com.clickhouse.client.api.query.QueryResponse;
1821
import com.clickhouse.client.config.ClickHouseClientOption;
1922
import com.clickhouse.data.ClickHouseFormat;
2023
import com.github.tomakehurst.wiremock.WireMockServer;
@@ -66,7 +69,7 @@ public void setUp() throws IOException {
6669
.addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), false)
6770
.setUsername("default")
6871
.setPassword("")
69-
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true"))
72+
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "true").equals("true"))
7073
.compressClientRequest(useClientCompression)
7174
.useHttpCompression(useHttpCompression)
7275
.build();
@@ -123,6 +126,66 @@ public void insertSimplePOJOs() throws Exception {
123126
assertEquals(response.getQueryId(), uuid);
124127
}
125128

129+
static {
130+
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG");
131+
}
132+
133+
@Test(groups = { "integration" }, enabled = true)
134+
public void insertPOJOAndReadBack() throws Exception {
135+
final String tableName = "single_pojo_table";
136+
final String createSQL = SamplePOJO.generateTableCreateSQL(tableName);
137+
final SamplePOJO pojo = new SamplePOJO();
138+
139+
dropTable(tableName);
140+
createTable(createSQL);
141+
client.register(SamplePOJO.class, client.getTableSchema(tableName, "default"));
142+
143+
System.out.println("Inserting POJO: " + pojo);
144+
try (InsertResponse response = client.insert(tableName, Collections.singletonList(pojo), settings).get(30, TimeUnit.SECONDS)) {
145+
Assert.assertEquals(response.getWrittenRows(), 1);
146+
}
147+
148+
// try (QueryResponse queryResponse =
149+
// client.query("SELECT * FROM " + tableName + " LIMIT 1").get(30, TimeUnit.SECONDS)) {
150+
//
151+
// ClickHouseBinaryFormatReader reader = client.newBinaryFormatReader(queryResponse);
152+
// Assert.assertNotNull(reader.next());
153+
//
154+
// Assert.assertEquals(reader.getByte("byteValue"), pojo.getByteValue());
155+
// Assert.assertEquals(reader.getByte("int8"), pojo.getInt8());
156+
// Assert.assertEquals(reader.getShort("uint8"), pojo.getUint8());
157+
// Assert.assertEquals(reader.getShort("int16"), pojo.getInt16());
158+
// Assert.assertEquals(reader.getInteger("int32"), pojo.getInt32());
159+
// Assert.assertEquals(reader.getLong("int64"), pojo.getInt64());
160+
// Assert.assertEquals(reader.getFloat("float32"), pojo.getFloat32());
161+
// Assert.assertEquals(reader.getDouble("float64"), pojo.getFloat64());
162+
// Assert.assertEquals(reader.getString("string"), pojo.getString());
163+
// Assert.assertEquals(reader.getString("fixedString"), pojo.getFixedString());
164+
// }
165+
}
166+
167+
@Test
168+
public void testInsertingPOJOWithNullValueForNonNullableColumn() throws Exception {
169+
final String tableName = "single_pojo_table";
170+
final String createSQL = SamplePOJO.generateTableCreateSQL(tableName);
171+
final SamplePOJO pojo = new SamplePOJO();
172+
173+
pojo.setBoxedByte(null);
174+
175+
dropTable(tableName);
176+
createTable(createSQL);
177+
client.register(SamplePOJO.class, client.getTableSchema(tableName, "default"));
178+
179+
180+
181+
try (InsertResponse response = client.insert(tableName, Collections.singletonList(pojo), settings).get(30, TimeUnit.SECONDS)) {
182+
fail("Should have thrown an exception");
183+
} catch (ClientException e) {
184+
e.printStackTrace();
185+
assertTrue(e.getCause() instanceof IllegalArgumentException);
186+
}
187+
}
188+
126189
@Test(groups = { "integration" }, enabled = true)
127190
public void insertRawData() throws Exception {
128191
final String tableName = "raw_data_table";

0 commit comments

Comments
 (0)