11package com .clickhouse .client .api ;
22
33import com .clickhouse .client .*;
4+ import com .clickhouse .client .api .exception .ClientException ;
5+ import com .clickhouse .client .api .insert .InsertResponse ;
6+ import com .clickhouse .client .api .insert .InsertSettings ;
7+ import com .clickhouse .client .api .insert .POJOSerializer ;
8+ import com .clickhouse .client .api .internal .SerializerUtils ;
49import com .clickhouse .client .api .internal .SettingsConverter ;
510import com .clickhouse .client .api .internal .ValidationUtils ;
6- import com .clickhouse .data .ClickHouseColumn ;
11+ import com .clickhouse .client .config .ClickHouseClientOption ;
12+ import com .clickhouse .data .*;
713
14+ import java .io .ByteArrayInputStream ;
15+ import java .io .ByteArrayOutputStream ;
16+ import java .io .IOException ;
817import java .io .InputStream ;
18+ import java .lang .reflect .InvocationTargetException ;
19+ import java .lang .reflect .Method ;
20+ import java .math .BigDecimal ;
21+ import java .math .BigInteger ;
22+ import java .net .Inet4Address ;
23+ import java .net .Inet6Address ;
924import java .net .SocketException ;
1025import java .time .Duration ;
26+ import java .time .LocalDate ;
27+ import java .time .LocalDateTime ;
1128import java .time .temporal .ChronoUnit ;
1229import java .util .*;
1330import com .clickhouse .client .api .metadata .TableSchema ;
1431import com .clickhouse .client .api .internal .TableSchemaParser ;
1532import com .clickhouse .client .api .query .QueryResponse ;
1633import com .clickhouse .client .api .query .QuerySettings ;
17- import com .clickhouse .data .ClickHouseFormat ;
34+ import com .clickhouse .data .format .BinaryStreamUtils ;
35+ import org .apache .commons .lang3 .time .StopWatch ;
1836import org .slf4j .Logger ;
1937import org .slf4j .LoggerFactory ;
2038import org .slf4j .MDC ;
21- import org .slf4j .helpers .BasicMDCAdapter ;
2239
2340import java .util .concurrent .CompletableFuture ;
41+ import java .util .concurrent .ExecutionException ;
2442import java .util .concurrent .Future ;
2543
2644import static java .time .temporal .ChronoUnit .SECONDS ;
@@ -30,14 +48,20 @@ public class Client {
3048 private Set <String > endpoints ;
3149 private Map <String , String > configuration ;
3250 private List <ClickHouseNode > serverNodes = new ArrayList <>();
33- private static final Logger LOG = LoggerFactory .getLogger (Client .class );
51+ private Map <Class <?>, List <POJOSerializer >> serializers ;//Order is important to preserve for RowBinary
52+ private Map <Class <?>, Map <String , Method >> getterMethods ;
53+ private Map <Class <?>, Boolean > hasDefaults ;
54+ private static final Logger LOG = LoggerFactory .getLogger (Client .class );
3455
3556 private Client (Set <String > endpoints , Map <String ,String > configuration ) {
3657 this .endpoints = endpoints ;
3758 this .configuration = configuration ;
3859 this .endpoints .forEach (endpoint -> {
3960 this .serverNodes .add (ClickHouseNode .of (endpoint , this .configuration ));
4061 });
62+ this .serializers = new HashMap <>();
63+ this .getterMethods = new HashMap <>();
64+ this .hasDefaults = new HashMap <>();
4165 }
4266
4367 public static class Builder {
@@ -159,29 +183,144 @@ public boolean ping(int timeout) {
159183 * Register the POJO
160184 */
161185 public void register (Class <?> clazz , TableSchema schema ) {
162- //This is just a placeholder
186+ LOG .debug ("Registering POJO: {}" , clazz .getName ());
187+
163188 //Create a new POJOSerializer with static .serialize(object, columns) methods
189+ List <POJOSerializer > serializers = new ArrayList <>();
190+ Map <String , Method > getterMethods = new HashMap <>();
191+
192+ for (Method method : clazz .getMethods ()) {//Clean up the method names
193+ String methodName = method .getName ();
194+ if (methodName .startsWith ("get" ) || methodName .startsWith ("has" )) {
195+ methodName = methodName .substring (3 ).toLowerCase ();
196+ getterMethods .put (methodName , method );
197+ } if (methodName .startsWith ("is" )) {
198+ methodName = methodName .substring (2 ).toLowerCase ();
199+ getterMethods .put (methodName , method );
200+ }
201+ }
202+ this .getterMethods .put (clazz , getterMethods );//Store the getter methods for later use
203+
204+ for (ClickHouseColumn column : schema .getColumns ()) {
205+ String columnName = column .getColumnName ().toLowerCase ().replace ("_" , "" );
206+ serializers .add ((obj , stream ) -> {
207+ if (!getterMethods .containsKey (columnName )) {
208+ LOG .warn ("No getter method found for column: {}" , columnName );
209+ return ;
210+ }
211+ Method getterMethod = this .getterMethods .get (clazz ).get (columnName );
212+ Object value = getterMethod .invoke (obj );
213+ boolean hasDefaults = this .hasDefaults .get (clazz );
214+
215+ //Handle null values
216+ if (value == null ) {
217+ if (hasDefaults && !column .hasDefault ()) {//Send this only if there is no default
218+ BinaryStreamUtils .writeNonNull (stream );
219+ }
220+ BinaryStreamUtils .writeNull (stream );//We send this regardless of default or nullable
221+ return ;
222+ }
223+
224+ //Handle default
225+ if (hasDefaults ) {
226+ BinaryStreamUtils .writeNonNull (stream );//Write 0
227+ }
228+
229+ //Handle nullable
230+ if (column .isNullable ()) {
231+ BinaryStreamUtils .writeNonNull (stream );//Write 0
232+ }
233+
234+ //Handle the different types
235+ SerializerUtils .serializeData (stream , value , column );
236+ });
237+ }
238+ this .serializers .put (clazz , serializers );
239+ this .hasDefaults .put (clazz , schema .hasDefaults ());
164240 }
165241
166242 /**
167243 * Insert data into ClickHouse using a POJO
168244 */
169- public Future <InsertResponse > insert (String tableName ,
170- List <Object > data ,
171- InsertSettings settings ,
172- List <ClickHouseColumn > columns ) throws ClickHouseException , SocketException {
245+ public InsertResponse insert (String tableName ,
246+ List <Object > data ,
247+ InsertSettings settings ) throws ClientException , IOException {
248+ if (data == null || data .isEmpty ()) {
249+ throw new IllegalArgumentException ("Data cannot be empty" );
250+ }
251+ StopWatch watch = StopWatch .createStarted ();
252+
253+ //Add format to the settings
254+ if (settings == null ) {
255+ settings = new InsertSettings ();
256+ }
257+
258+ boolean hasDefaults = this .hasDefaults .get (data .get (0 ).getClass ());
259+ if (hasDefaults ) {
260+ settings .setFormat (ClickHouseFormat .RowBinaryWithDefaults );
261+ } else {
262+ settings .setFormat (ClickHouseFormat .RowBinary );
263+ }
264+
265+
266+ //Create an output stream to write the data to
267+ ByteArrayOutputStream stream = new ByteArrayOutputStream ();
268+
173269 //Lookup the Serializer for the POJO
270+ List <POJOSerializer > serializers = this .serializers .get (data .get (0 ).getClass ());
271+ if (serializers == null || serializers .isEmpty ()) {
272+ throw new IllegalArgumentException ("No serializer found for the given class. Please register() before calling this method." );
273+ }
274+
174275 //Call the static .serialize method on the POJOSerializer for each object in the list
175- return null ;//This is just a placeholder
276+ for (Object obj : data ) {
277+ for (POJOSerializer serializer : serializers ) {
278+ try {
279+ serializer .serialize (obj , stream );
280+ } catch (InvocationTargetException | IllegalAccessException | IOException e ) {
281+ throw new ClientException (e );
282+ }
283+ }
284+ }
285+
286+ watch .stop ();
287+ LOG .debug ("Total serialization time: {}" , watch .getTime ());
288+ return insert (tableName , new ByteArrayInputStream (stream .toByteArray ()), settings );
176289 }
177290
178291 /**
179292 * Insert data into ClickHouse using a binary stream
180293 */
181- public Future < InsertResponse > insert (String tableName ,
294+ public InsertResponse insert (String tableName ,
182295 InputStream data ,
183- InsertSettings settings ) throws ClickHouseException , SocketException {
184- return null ;//This is just a placeholder
296+ InsertSettings settings ) throws IOException , ClientException {
297+ StopWatch watch = StopWatch .createStarted ();
298+ InsertResponse response ;
299+ try (ClickHouseClient client = createClient ()) {
300+ ClickHouseRequest .Mutation request = createMutationRequest (client .write (getServerNode ()), tableName , settings )
301+ .format (settings .getFormat ());
302+
303+ Future <ClickHouseResponse > future ;
304+ try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory .getInstance ().createPipedOutputStream (request .getConfig ())) {
305+ future = request .data (stream .getInputStream ()).execute ();
306+
307+ //Copy the data from the input stream to the output stream
308+ byte [] buffer = new byte [settings .getInputStreamBatchSize ()];
309+ int bytesRead ;
310+ while ((bytesRead = data .read (buffer )) != -1 ) {
311+ stream .write (buffer , 0 , bytesRead );
312+ }
313+ }
314+ try {
315+ response = new InsertResponse (client , future .get ());
316+ } catch (InterruptedException | ExecutionException e ) {
317+ throw new ClientException ("Operation has likely timed out." , e );
318+ }
319+ }
320+
321+ watch .stop ();
322+ LOG .debug ("Total insert (InputStream) time: {}" , watch .getTime ());
323+ return response ;
185324 }
186325
187326
@@ -228,11 +367,27 @@ public TableSchema getTableSchema(String table, String database) {
228367
229368 private ClickHouseClient createClient () {
230369 ClickHouseConfig clientConfig = new ClickHouseConfig ();
231- return ClickHouseClient .builder ().config (clientConfig )
370+ return ClickHouseClient .builder ()
371+ .config (clientConfig )
232372 .nodeSelector (ClickHouseNodeSelector .of (ClickHouseProtocol .HTTP ))
233373 .build ();
234374 }
235375
376+ private ClickHouseRequest .Mutation createMutationRequest (ClickHouseRequest .Mutation request , String tableName , InsertSettings settings ) {
377+ if (settings == null ) return request .table (tableName );
378+
379+ if (settings .getSetting ("query_id" ) != null ) {
380+ request .table (tableName , settings .getSetting ("query_id" ).toString ());
381+ } else {
382+ request .table (tableName );
383+ }
384+
385+ if (settings .getSetting ("insert_deduplication_token" ) != null ) {
386+ request .set ("insert_deduplication_token" , settings .getSetting ("insert_deduplication_token" ).toString ());
387+ }
388+ return request ;
389+ }
390+
236391 private static final Set <String > COMPRESS_ALGORITHMS = ValidationUtils .whiteList ("LZ4" , "LZ4HC" , "ZSTD" , "ZSTDHC" , "NONE" );
237392
238393 public static Set <String > getCompressAlgorithms () {
0 commit comments