2020import com .clickhouse .client .api .insert .InsertResponse ;
2121import com .clickhouse .client .api .insert .InsertSettings ;
2222import com .clickhouse .client .api .insert .POJOSerializer ;
23- import com .clickhouse .client .api .insert .SerializerNotFoundException ;
24- import com .clickhouse .client .api .internal .BasicObjectsPool ;
2523import com .clickhouse .client .api .internal .ClickHouseLZ4OutputStream ;
2624import com .clickhouse .client .api .internal .ClientStatisticsHolder ;
2725import com .clickhouse .client .api .internal .ClientV1AdaptorHelper ;
3533import com .clickhouse .client .api .metrics .ClientMetrics ;
3634import com .clickhouse .client .api .metrics .OperationMetrics ;
3735import com .clickhouse .client .api .query .GenericRecord ;
36+ import com .clickhouse .client .api .query .POJOSetter ;
3837import com .clickhouse .client .api .query .QueryResponse ;
3938import com .clickhouse .client .api .query .QuerySettings ;
4039import com .clickhouse .client .api .query .Records ;
7069import java .util .HashMap ;
7170import java .util .HashSet ;
7271import java .util .LinkedHashMap ;
73- import java .util .LinkedList ;
7472import java .util .List ;
7573import java .util .Map ;
7674import java .util .Set ;
@@ -126,6 +124,10 @@ public class Client implements AutoCloseable {
126124 private final List <ClickHouseNode > serverNodes = new ArrayList <>();
127125 private final Map <Class <?>, List <POJOSerializer >> serializers ; //Order is important to preserve for RowBinary
128126 private final Map <Class <?>, Map <String , Method >> getterMethods ;
127+
128+ private final Map <Class <?>, Map <String , POJOSetter >> deserializers ;
129+ private final Map <Class <?>, Map <String , Method >> setterMethods ;
130+
129131 private final Map <Class <?>, Boolean > hasDefaults ; // Whether the POJO has defaults
130132 private static final Logger LOG = LoggerFactory .getLogger (Client .class );
131133 private final ExecutorService sharedOperationExecutor ;
@@ -143,9 +145,11 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
143145 this .endpoints .forEach (endpoint -> {
144146 this .serverNodes .add (ClickHouseNode .of (endpoint , this .configuration ));
145147 });
146- this .serializers = new HashMap <>();
147- this .getterMethods = new HashMap <>();
148- this .hasDefaults = new HashMap <>();
148+ this .serializers = new ConcurrentHashMap <>();
149+ this .getterMethods = new ConcurrentHashMap <>();
150+ this .hasDefaults = new ConcurrentHashMap <>();
151+ this .deserializers = new ConcurrentHashMap <>();
152+ this .setterMethods = new ConcurrentHashMap <>();
149153
150154 boolean isAsyncEnabled = MapUtils .getFlag (this .configuration , ClickHouseClientOption .ASYNC .getKey ());
151155 if (isAsyncEnabled && sharedOperationExecutor == null ) {
@@ -929,61 +933,76 @@ public boolean ping(long timeout) {
929933 * @param clazz - class of a POJO
930934 * @param schema - correlating table schema
931935 */
932- public void register (Class <?> clazz , TableSchema schema ) {
936+ public synchronized void register (Class <?> clazz , TableSchema schema ) {
933937 LOG .debug ("Registering POJO: {}" , clazz .getName ());
934938
935939 //Create a new POJOSerializer with static .serialize(object, columns) methods
936- List <POJOSerializer > serializers = new ArrayList <>();
937- Map <String , Method > getterMethods = new HashMap <>();
938-
939- for (Method method : clazz .getMethods ()) {//Clean up the method names
940+ Map <String , Method > classGetters = new HashMap <>();
941+ Map <String , Method > classSetters = new HashMap <>();
942+ for (Method method : clazz .getMethods ()) {//Clean up the method names
940943 String methodName = method .getName ();
941944 if (methodName .startsWith ("get" ) || methodName .startsWith ("has" )) {
942945 methodName = methodName .substring (3 ).toLowerCase ();
943- getterMethods .put (methodName , method );
944- } if (methodName .startsWith ("is" )) {
946+ classGetters .put (methodName , method );
947+ } else if (methodName .startsWith ("is" )) {
945948 methodName = methodName .substring (2 ).toLowerCase ();
946- getterMethods .put (methodName , method );
949+ classGetters .put (methodName , method );
950+ } else if (methodName .startsWith ("set" )) {
951+ methodName = methodName .substring (3 ).toLowerCase ();
952+ classSetters .put (methodName , method );
947953 }
948954 }
949- this .getterMethods .put (clazz , getterMethods );//Store the getter methods for later use
955+ this .getterMethods .put (clazz , classGetters );//Store the getter methods for later use
956+ this .setterMethods .put (clazz , classSetters );//Store the setter methods for later use
957+ this .hasDefaults .put (clazz , schema .hasDefaults ());
950958
959+ List <POJOSerializer > classSerializers = new ArrayList <>();
960+ Map <String , POJOSetter > classDeserializers = new ConcurrentHashMap <>();
951961 for (ClickHouseColumn column : schema .getColumns ()) {
952- String columnName = column .getColumnName ().toLowerCase ().replace ("_" , "" ).replace ("." ,"" );
953- serializers .add ((obj , stream ) -> {
954- if (!getterMethods .containsKey (columnName )) {
955- LOG .warn ("No getter method found for column: {}" , columnName );
956- return ;
957- }
958- Method getterMethod = this .getterMethods .get (clazz ).get (columnName );
959- Object value = getterMethod .invoke (obj );
960- boolean hasDefaults = this .hasDefaults .get (clazz );
961-
962- //Handle null values
963- if (value == null ) {
964- if (hasDefaults && !column .hasDefault ()) {//Send this only if there is no default
965- BinaryStreamUtils .writeNonNull (stream );
962+ String propertyName = column .getColumnName ().toLowerCase ().replace ("_" , "" ).replace ("." , "" );
963+
964+ Method getterMethod = classGetters .get (propertyName );
965+ boolean classHashDefaults = this .hasDefaults .get (clazz );
966+ if (getterMethod != null ) {
967+ classSerializers .add ((obj , stream ) -> {
968+ Object value = getterMethod .invoke (obj );
969+
970+ //Handle null values
971+ if (value == null ) {
972+ if (classHashDefaults && !column .hasDefault ()) {//Send this only if there is no default
973+ BinaryStreamUtils .writeNonNull (stream );
974+ }
975+ BinaryStreamUtils .writeNull (stream );//We send this regardless of default or nullable
976+ return ;
966977 }
967- BinaryStreamUtils .writeNull (stream );//We send this regardless of default or nullable
968- return ;
969- }
970978
971- //Handle default
972- if (hasDefaults ) {
973- BinaryStreamUtils .writeNonNull (stream );//Write 0
974- }
979+ //Handle default
980+ if (classHashDefaults ) {
981+ BinaryStreamUtils .writeNonNull (stream );//Write 0
982+ }
975983
976- //Handle nullable
977- if (column .isNullable ()) {
978- BinaryStreamUtils .writeNonNull (stream );//Write 0
979- }
984+ //Handle nullable
985+ if (column .isNullable ()) {
986+ BinaryStreamUtils .writeNonNull (stream );//Write 0
987+ }
980988
981- //Handle the different types
982- SerializerUtils .serializeData (stream , value , column );
983- });
989+ //Handle the different types
990+ SerializerUtils .serializeData (stream , value , column );
991+ });
992+ } else {
993+ LOG .warn ("No getter method found for column: {}" , propertyName );
994+ }
995+
996+ Method setterMethod = classSetters .get (propertyName );
997+ String columnName = column .getColumnName ();
998+ if (setterMethod != null ) {
999+ classDeserializers .put (columnName , SerializerUtils .compilePOJOSetter (setterMethod , column ));
1000+ } else {
1001+ LOG .warn ("No setter method found for column: {}" , propertyName );
1002+ }
9841003 }
985- this .serializers .put (clazz , serializers );
986- this .hasDefaults .put (clazz , schema . hasDefaults () );
1004+ this .serializers .put (clazz , classSerializers );
1005+ this .deserializers .put (clazz , classDeserializers );
9871006 }
9881007
9891008 /**
@@ -1023,21 +1042,30 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data)
10231042 * @param tableName - destination table name
10241043 * @param data - data stream to insert
10251044 * @param settings - insert operation settings
1045+ * @throws IllegalArgumentException when data is empty or not registered
10261046 * @return {@code CompletableFuture<InsertResponse>} - a promise to insert response
10271047 */
10281048 public CompletableFuture <InsertResponse > insert (String tableName , List <?> data , InsertSettings settings ) {
10291049
1050+ if (data == null || data .isEmpty ()) {
1051+ throw new IllegalArgumentException ("Data cannot be empty" );
1052+ }
1053+
1054+ //Lookup the Serializer for the POJO
1055+ List <POJOSerializer > serializers = this .serializers .get (data .get (0 ).getClass ());
1056+ if (serializers == null || serializers .isEmpty ()) {
1057+ throw new IllegalArgumentException ("No serializers found for class '" + data .get (0 ).getClass () + "'. Did you forget to register it?" );
1058+ } else {
1059+ LOG .info ("serializers: {}" , serializers .size ());
1060+ }
1061+
10301062 String operationId = startOperation ();
10311063 settings .setOperationId (operationId );
10321064 if (useNewImplementation ) {
10331065 globalClientStats .get (operationId ).start (ClientMetrics .OP_DURATION );
10341066 }
10351067 globalClientStats .get (operationId ).start (ClientMetrics .OP_SERIALIZATION );
10361068
1037- if (data == null || data .isEmpty ()) {
1038- throw new IllegalArgumentException ("Data cannot be empty" );
1039- }
1040-
10411069 //Add format to the settings
10421070 if (settings == null ) {
10431071 settings = new InsertSettings ();
@@ -1046,12 +1074,6 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
10461074 boolean hasDefaults = this .hasDefaults .get (data .get (0 ).getClass ());
10471075 ClickHouseFormat format = hasDefaults ? ClickHouseFormat .RowBinaryWithDefaults : ClickHouseFormat .RowBinary ;
10481076
1049- //Lookup the Serializer for the POJO
1050- List <POJOSerializer > serializers = this .serializers .get (data .get (0 ).getClass ());
1051- if (serializers == null || serializers .isEmpty ()) {
1052- throw new SerializerNotFoundException (data .get (0 ).getClass ());
1053- }
1054-
10551077 if (useNewImplementation ) {
10561078 String retry = configuration .get (ClickHouseClientOption .RETRY .getKey ());
10571079 final int maxRetries = retry == null ? (int ) ClickHouseClientOption .RETRY .getDefaultValue () : Integer .parseInt (retry );
@@ -1508,6 +1530,61 @@ public List<GenericRecord> queryAll(String sqlQuery) {
15081530 }
15091531 }
15101532
1533+ public <T > List <T > queryAll (String sqlQuery , Class <T > clazz ) {
1534+ return queryAll (sqlQuery , clazz , null );
1535+ }
1536+
1537+ /**
1538+ * WARNING: Experimental API
1539+ *
1540+ * <p>Queries data and returns collection with whole result. Data is read directly to a DTO
1541+ * to save memory on intermediate structures. DTO will be instantiated with default constructor or
1542+ * by using allocator</p>
1543+ * <p>{@code class} should be registered before calling this method using {@link #register(Class, TableSchema)}</p>
1544+ * <p>Internally deserializer is compiled at the register stage. Compilation is done using ASM library by
1545+ * writing a bytecode</p>
1546+ *
1547+ * @param sqlQuery - query to execute
1548+ * @param clazz - class of the DTO
1549+ * @param allocator - optional supplier to create new instances of the DTO.
1550+ * @throws IllegalArgumentException when class is not registered or no setters found
1551+ * @return List of POJOs filled with data
1552+ * @param <T>
1553+ */
1554+ public <T > List <T > queryAll (String sqlQuery , Class <T > clazz , Supplier <T > allocator ) {
1555+ Map <String , POJOSetter > classDeserializers = deserializers .get (clazz );
1556+
1557+ if (classDeserializers == null || classDeserializers .isEmpty ()) {
1558+ throw new IllegalArgumentException ("No deserializers found for class '" + clazz + "'. Did you forget to register it?" );
1559+ }
1560+
1561+ try {
1562+ int operationTimeout = getOperationTimeout ();
1563+ QuerySettings settings = new QuerySettings ().setFormat (ClickHouseFormat .RowBinaryWithNamesAndTypes );
1564+ try (QueryResponse response = operationTimeout == 0 ? query (sqlQuery , settings ).get () :
1565+ query (sqlQuery , settings ).get (operationTimeout , TimeUnit .MILLISECONDS )) {
1566+ List <T > records = new ArrayList <>();
1567+ RowBinaryWithNamesAndTypesFormatReader reader =
1568+ (RowBinaryWithNamesAndTypesFormatReader ) newBinaryFormatReader (response );
1569+
1570+ while (true ) {
1571+ Object record = allocator == null ? clazz .getDeclaredConstructor ().newInstance () : allocator .get ();
1572+ if (reader .readToPOJO (classDeserializers , record )) {
1573+ records .add ((T ) record );
1574+ } else {
1575+ break ;
1576+ }
1577+ }
1578+
1579+ return records ;
1580+ }
1581+ } catch (ExecutionException e ) {
1582+ throw new ClientException ("Failed to get query response" , e .getCause ());
1583+ } catch (Exception e ) {
1584+ throw new ClientException ("Failed to get query response" , e );
1585+ }
1586+ }
1587+
15111588 /**
15121589 * <p>Fetches schema of a table and returns complete information about each column.
15131590 * Information includes column name, type, default value, etc.</p>
@@ -1548,6 +1625,23 @@ public TableSchema getTableSchema(String table, String database) {
15481625 }
15491626 }
15501627
1628+ public TableSchema getTableSchemaFromQuery (String sql , String name ) {
1629+ final String describeQuery = "DESC (" + sql + ") FORMAT " + ClickHouseFormat .TSKV .name ();
1630+
1631+ int operationTimeout = getOperationTimeout ();
1632+
1633+ try (QueryResponse response = operationTimeout == 0 ? query (describeQuery ).get () :
1634+ query (describeQuery ).get (getOperationTimeout (), TimeUnit .SECONDS )) {
1635+ return new TableSchemaParser ().readTSKV (response .getInputStream (), name , getDefaultDatabase ());
1636+ } catch (TimeoutException e ) {
1637+ throw new ClientException ("Operation has likely timed out after " + getOperationTimeout () + " seconds." , e );
1638+ } catch (ExecutionException e ) {
1639+ throw new ClientException ("Failed to get table schema" , e .getCause ());
1640+ } catch (Exception e ) {
1641+ throw new ClientException ("Failed to get table schema" , e );
1642+ }
1643+ }
1644+
15511645 /**
15521646 * <p>Executes a SQL command and doesn't care response. Useful for DDL statements, like `CREATE`, `DROP`, `ALTER`.
15531647 * Method however returns execution errors from a server or summary in case of successful execution. </p>
0 commit comments