5151import org .apache .hc .core5 .http .ConnectionRequestTimeoutException ;
5252import org .apache .hc .core5 .http .HttpStatus ;
5353import org .apache .hc .core5 .http .NoHttpResponseException ;
54- import org .objectweb .asm .Type ;
5554import org .slf4j .Logger ;
5655import org .slf4j .LoggerFactory ;
5756
@@ -124,13 +123,13 @@ public class Client implements AutoCloseable {
124123 private final Set <String > endpoints ;
125124 private final Map <String , String > configuration ;
126125 private final List <ClickHouseNode > serverNodes = new ArrayList <>();
127- private final Map <Class <?>, Map <String , POJOSerializer >> serializers ;
128- private final Map <Class <?>, Map <String , Method >> getterMethods ;
129126
130- private final Map <Class <?>, Map <String , POJOSetter >> deserializers ;
131- private final Map <Class <?>, Map <String , Method >> setterMethods ;
127+ // POJO serializer mapping (class -> (schema -> (format -> serializer)))
128+ private final Map <Class <?>, Map <String , Map <String , POJOSerializer >>> serializers ;
129+
130+ // POJO deserializer mapping (class -> (schema -> (format -> deserializer)))
131+ private final Map <Class <?>, Map <String , Map <String , POJOSetter >>> deserializers ;
132132
133- private final Map <Class <?>, Boolean > hasDefaults ; // Whether the POJO has defaults
134133 private static final Logger LOG = LoggerFactory .getLogger (Client .class );
135134 private final ExecutorService sharedOperationExecutor ;
136135
@@ -141,6 +140,7 @@ public class Client implements AutoCloseable {
141140 private ClickHouseClient oldClient = null ;
142141
143142 private Map <String , TableSchema > tableSchemaCache = new ConcurrentHashMap <>();
143+ private Map <String , Boolean > tableSchemaHasDefaults = new ConcurrentHashMap <>();
144144
145145 private Client (Set <String > endpoints , Map <String ,String > configuration , boolean useNewImplementation ,
146146 ExecutorService sharedOperationExecutor ) {
@@ -150,10 +150,7 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
150150 this .serverNodes .add (ClickHouseNode .of (endpoint , this .configuration ));
151151 });
152152 this .serializers = new ConcurrentHashMap <>();
153- this .getterMethods = new ConcurrentHashMap <>();
154- this .hasDefaults = new ConcurrentHashMap <>();
155153 this .deserializers = new ConcurrentHashMap <>();
156- this .setterMethods = new ConcurrentHashMap <>();
157154
158155 boolean isAsyncEnabled = MapUtils .getFlag (this .configuration , ClickHouseClientOption .ASYNC .getKey ());
159156 if (isAsyncEnabled && sharedOperationExecutor == null ) {
@@ -933,15 +930,24 @@ public boolean ping(long timeout) {
933930
934931 /**
935932 * <p>Registers a POJO class and maps its fields to a table schema</p>
936- * <p>Note: table schema will be stored in cache to be used while other operations. Call this method
933+ * <p>Note: table schema will be stored in cache to be used while other operations. Cache key is
934+ * {@link TableSchema schemaId}. Call this method
937935 * to update cache.</p>
938936 *
939937 * @param clazz - class of a POJO
940938 * @param schema - correlating table schema
941939 */
942940 public synchronized void register (Class <?> clazz , TableSchema schema ) {
943941 LOG .debug ("Registering POJO: {}" , clazz .getName ());
944- tableSchemaCache .put (schema .getTableName (), schema );
942+ String schemaKey ;
943+ if (schema .getTableName () != null && schema .getQuery () == null ) {
944+ schemaKey = schema .getTableName ();
945+ } else if (schema .getQuery () != null && schema .getTableName () == null ) {
946+ schemaKey = schema .getQuery ();
947+ } else {
948+ throw new IllegalArgumentException ("Table schema has both query and table name set. Only one is allowed." );
949+ }
950+ tableSchemaCache .put (schemaKey , schema );
945951
946952 //Create a new POJOSerializer with static .serialize(object, columns) methods
947953 Map <String , Method > classGetters = new HashMap <>();
@@ -959,19 +965,16 @@ public synchronized void register(Class<?> clazz, TableSchema schema) {
959965 classSetters .put (methodName , method );
960966 }
961967 }
962- this .getterMethods .put (clazz , classGetters );//Store the getter methods for later use
963- this .setterMethods .put (clazz , classSetters );//Store the setter methods for later use
964- this .hasDefaults .put (clazz , schema .hasDefaults ());
965968
966- Map <String , POJOSerializer > classSerializers = new HashMap <>();
967- Map <String , POJOSetter > classDeserializers = new ConcurrentHashMap <>();
969+ Map <String , POJOSerializer > schemaSerializers = new HashMap <>();
970+ Map <String , POJOSetter > schemaDeserializers = new ConcurrentHashMap <>();
971+ boolean defaultsSupport = schema .hasDefaults ();
972+ tableSchemaHasDefaults .put (schemaKey , defaultsSupport );
968973 for (ClickHouseColumn column : schema .getColumns ()) {
969974 String propertyName = column .getColumnName ().toLowerCase ().replace ("_" , "" ).replace ("." , "" );
970-
971975 Method getterMethod = classGetters .get (propertyName );
972- boolean defaultsSupport = this .hasDefaults .get (clazz );
973976 if (getterMethod != null ) {
974- classSerializers .put (schema . getTableName () + "." + column .getColumnName (), (obj , stream ) -> {
977+ schemaSerializers .put (column .getColumnName (), (obj , stream ) -> {
975978 Object value = getterMethod .invoke (obj );
976979
977980 if (defaultsSupport ) {
@@ -1017,15 +1020,18 @@ public synchronized void register(Class<?> clazz, TableSchema schema) {
10171020
10181021 // Deserialization stuff
10191022 Method setterMethod = classSetters .get (propertyName );
1020- String columnName = column .getColumnName ();
10211023 if (setterMethod != null ) {
1022- classDeserializers .put (columnName , SerializerUtils .compilePOJOSetter (setterMethod , column ));
1024+ schemaDeserializers .put (column . getColumnName () , SerializerUtils .compilePOJOSetter (setterMethod , column ));
10231025 } else {
10241026 LOG .warn ("No setter method found for column: {}" , propertyName );
10251027 }
10261028 }
1027- this .serializers .put (clazz , classSerializers );
1028- this .deserializers .put (clazz , classDeserializers );
1029+
1030+ Map <String , Map <String , POJOSerializer >> classSerializers = serializers .computeIfAbsent (clazz , k -> new HashMap <>());
1031+ Map <String , Map <String , POJOSetter >> classDeserializers = deserializers .computeIfAbsent (clazz , k -> new HashMap <>());
1032+
1033+ classSerializers .put (schemaKey , schemaSerializers );
1034+ classDeserializers .put (schemaKey , schemaDeserializers );
10291035 }
10301036
10311037 /**
@@ -1087,17 +1093,18 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
10871093 settings = new InsertSettings ();
10881094 }
10891095
1090- boolean hasDefaults = this .hasDefaults .get (data . get ( 0 ). getClass () );
1096+ boolean hasDefaults = this .tableSchemaHasDefaults .get (tableName );
10911097 ClickHouseFormat format = hasDefaults ? ClickHouseFormat .RowBinaryWithDefaults : ClickHouseFormat .RowBinary ;
10921098 TableSchema tableSchema = tableSchemaCache .get (tableName );
10931099 if (tableSchema == null ) {
1094- tableSchema = getTableSchema ( tableName );
1100+ throw new IllegalArgumentException ( "Table schema not found for table: " + tableName + ". Did you forget to register it?" );
10951101 }
10961102 //Lookup the Serializer for the POJO
1097- Map <String , POJOSerializer > classSerializers = serializers .getOrDefault (data .get (0 ).getClass (), Collections .emptyMap ());
1103+ Map <String , POJOSerializer > classSerializers = serializers .getOrDefault (data .get (0 ).getClass (), Collections .emptyMap ())
1104+ .getOrDefault (tableName , Collections .emptyMap ());
10981105 List <POJOSerializer > serializersForTable = new ArrayList <>();
10991106 for (ClickHouseColumn column : tableSchema .getColumns ()) {
1100- POJOSerializer serializer = classSerializers .get (tableName + "." + column .getColumnName ());
1107+ POJOSerializer serializer = classSerializers .get (column .getColumnName ());
11011108 if (serializer == null ) {
11021109 throw new IllegalArgumentException ("No serializer found for column '" + column .getColumnName () + "'. Did you forget to register it?" );
11031110 }
@@ -1562,8 +1569,8 @@ public List<GenericRecord> queryAll(String sqlQuery) {
15621569 }
15631570 }
15641571
1565- public <T > List <T > queryAll (String sqlQuery , Class <T > clazz ) {
1566- return queryAll (sqlQuery , clazz , null );
1572+ public <T > List <T > queryAll (String sqlQuery , Class <T > clazz , TableSchema schema ) {
1573+ return queryAll (sqlQuery , clazz , schema , null );
15671574 }
15681575
15691576 /**
@@ -1575,6 +1582,8 @@ public <T> List<T> queryAll(String sqlQuery, Class<T> clazz) {
15751582 * <p>{@code class} should be registered before calling this method using {@link #register(Class, TableSchema)}</p>
15761583 * <p>Internally deserializer is compiled at the register stage. Compilation is done using ASM library by
15771584 * writing a bytecode</p>
1585+ * <p>Note: this method will cache schema and it will use sql as a key for storage.</p>
1586+ *
15781587 *
15791588 * @param sqlQuery - query to execute
15801589 * @param clazz - class of the DTO
@@ -1583,11 +1592,13 @@ public <T> List<T> queryAll(String sqlQuery, Class<T> clazz) {
15831592 * @return List of POJOs filled with data
15841593 * @param <T>
15851594 */
1586- public <T > List <T > queryAll (String sqlQuery , Class <T > clazz , Supplier <T > allocator ) {
1587- Map <String , POJOSetter > classDeserializers = deserializers .get (clazz );
1595+ public <T > List <T > queryAll (String sqlQuery , Class <T > clazz , TableSchema schema , Supplier <T > allocator ) {
1596+ Map <String , POJOSetter > classDeserializers = deserializers .getOrDefault (clazz ,
1597+ Collections .emptyMap ()).getOrDefault (schema .getTableName () == null ?
1598+ schema .getQuery () : schema .getTableName (), Collections .emptyMap ());
15881599
1589- if (classDeserializers == null || classDeserializers .isEmpty ()) {
1590- throw new IllegalArgumentException ("No deserializers found for class '" + clazz + "'. Did you forget to register it?" );
1600+ if (classDeserializers .isEmpty ()) {
1601+ throw new IllegalArgumentException ("No deserializers found for the query and class '" + clazz + "'. Did you forget to register it?" );
15911602 }
15921603
15931604 try {
@@ -1641,26 +1652,25 @@ public TableSchema getTableSchema(String table) {
16411652 */
16421653 public TableSchema getTableSchema (String table , String database ) {
16431654 final String sql = "DESCRIBE TABLE " + table + " FORMAT " + ClickHouseFormat .TSKV .name ();
1644- return getTableSchemaImpl (sql , table , database );
1655+ return getTableSchemaImpl (sql , table , null , database );
16451656 }
16461657
16471658 /**
16481659 * <p>Creates table schema from a query.</p>
1649- * <p>Note: this method will no cache table schema </p>
16501660 * @param sql - SQL query which schema to return
1651- * @return
1661+ * @return table schema for the query
16521662 */
16531663 public TableSchema getTableSchemaFromQuery (String sql ) {
16541664 final String describeQuery = "DESC (" + sql + ") FORMAT " + ClickHouseFormat .TSKV .name ();
1655- return getTableSchemaImpl (describeQuery , UUID . randomUUID (). toString () , getDefaultDatabase ());
1665+ return getTableSchemaImpl (describeQuery , null , sql , getDefaultDatabase ());
16561666 }
16571667
1658- private TableSchema getTableSchemaImpl (String describeQuery , String name , String database ) {
1668+ private TableSchema getTableSchemaImpl (String describeQuery , String name , String originalQuery , String database ) {
16591669 int operationTimeout = getOperationTimeout ();
16601670
16611671 try (QueryResponse response = operationTimeout == 0 ? query (describeQuery ).get () :
16621672 query (describeQuery ).get (getOperationTimeout (), TimeUnit .SECONDS )) {
1663- return new TableSchemaParser ().readTSKV (response .getInputStream (), name , database );
1673+ return new TableSchemaParser ().readTSKV (response .getInputStream (), name , originalQuery , database );
16641674 } catch (TimeoutException e ) {
16651675 throw new ClientException ("Operation has likely timed out after " + getOperationTimeout () + " seconds." , e );
16661676 } catch (ExecutionException e ) {
0 commit comments