1010import com .clickhouse .client .api .data_formats .ClickHouseBinaryFormatReader ;
1111import com .clickhouse .client .api .data_formats .NativeFormatReader ;
1212import com .clickhouse .client .api .data_formats .RowBinaryFormatReader ;
13+ import com .clickhouse .client .api .data_formats .RowBinaryFormatSerializer ;
1314import com .clickhouse .client .api .data_formats .RowBinaryWithNamesAndTypesFormatReader ;
1415import com .clickhouse .client .api .data_formats .RowBinaryWithNamesFormatReader ;
1516import com .clickhouse .client .api .data_formats .internal .BinaryStreamReader ;
4344import com .clickhouse .client .api .query .Records ;
4445import com .clickhouse .client .config .ClickHouseClientOption ;
4546import com .clickhouse .data .ClickHouseColumn ;
46- import com .clickhouse .data .ClickHouseDataType ;
4747import com .clickhouse .data .ClickHouseFormat ;
4848import org .apache .hc .client5 .http .ConnectTimeoutException ;
4949import org .apache .hc .core5 .concurrent .DefaultThreadFactory ;
5959import java .io .ByteArrayOutputStream ;
6060import java .io .IOException ;
6161import java .io .InputStream ;
62+ import java .io .OutputStream ;
6263import java .lang .reflect .InvocationTargetException ;
6364import java .lang .reflect .Method ;
6465import java .net .ConnectException ;
@@ -623,6 +624,11 @@ public Builder useHttpCompression(boolean enabled) {
623624 return this ;
624625 }
625626
627+ public Builder appCompressedData (boolean enabled ) {
628+ this .configuration .put (ClientConfigProperties .APP_COMPRESSED_DATA .getKey (), String .valueOf (enabled ));
629+ return this ;
630+ }
631+
626632 /**
627633 * Sets buffer size for uncompressed data in LZ4 compression.
628634 * For outgoing data it is the size of a buffer that will be compressed.
@@ -1066,6 +1072,11 @@ public Client build() {
10661072
10671073 private static final int DEFAULT_NETWORK_BUFFER_SIZE = 300_000 ;
10681074
1075+ /**
1076+ * Default size for a buffers used in networking.
1077+ */
1078+ public static final int DEFAULT_BUFFER_SIZE = 8192 ;
1079+
10691080 private void setDefaults () {
10701081
10711082 // set default database name if not specified
@@ -1154,6 +1165,10 @@ private void setDefaults() {
11541165 if (!configuration .containsKey (ClientConfigProperties .USE_HTTP_COMPRESSION .getKey ())) {
11551166 useHttpCompression (false );
11561167 }
1168+
1169+ if (!configuration .containsKey (ClientConfigProperties .APP_COMPRESSED_DATA .getKey ())) {
1170+ appCompressedData (false );
1171+ }
11571172 }
11581173 }
11591174
@@ -1236,45 +1251,9 @@ public synchronized void register(Class<?> clazz, TableSchema schema) {
12361251 schemaSerializers .put (column .getColumnName (), (obj , stream ) -> {
12371252 Object value = getterMethod .invoke (obj );
12381253
1239- if (defaultsSupport ) {
1240- if (value != null ) {//Because we now support defaults, we have to send nonNull
1241- SerializerUtils .writeNonNull (stream );//Write 0 for no default
1242-
1243- if (column .isNullable ()) {//If the column is nullable
1244- SerializerUtils .writeNonNull (stream );//Write 0 for not null
1245- }
1246- } else {//So if the object is null
1247- if (column .hasDefault ()) {
1248- SerializerUtils .writeNull (stream );//Send 1 for default
1249- return ;
1250- } else if (column .isNullable ()) {//And the column is nullable
1251- SerializerUtils .writeNonNull (stream );
1252- SerializerUtils .writeNull (stream );//Then we send null, write 1
1253- return ;//And we're done
1254- } else if (column .getDataType () == ClickHouseDataType .Array ) {//If the column is an array
1255- SerializerUtils .writeNonNull (stream );//Then we send nonNull
1256- } else {
1257- throw new IllegalArgumentException (String .format ("An attempt to write null into not nullable column '%s'" , column .getColumnName ()));
1258- }
1259- }
1260- } else {
1261- if (column .isNullable ()) {
1262- if (value == null ) {
1263- SerializerUtils .writeNull (stream );
1264- return ;
1265- }
1266- SerializerUtils .writeNonNull (stream );
1267- } else if (value == null ) {
1268- if (column .getDataType () == ClickHouseDataType .Array ) {
1269- SerializerUtils .writeNonNull (stream );
1270- } else {
1271- throw new IllegalArgumentException (String .format ("An attempt to write null into not nullable column '%s'" , column .getColumnName ()));
1272- }
1273- }
1254+ if (RowBinaryFormatSerializer .writeValuePreamble (stream , defaultsSupport , column , value )) {
1255+ SerializerUtils .serializeData (stream , value , column );
12741256 }
1275-
1276- //Handle the different types
1277- SerializerUtils .serializeData (stream , value , column );
12781257 });
12791258 } else {
12801259 LOG .warn ("No getter method found for column: {}" , propertyName );
@@ -1473,7 +1452,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, InputStream da
14731452 }
14741453
14751454 /**
1476- * <p> Sends write request to database. Input data is read from the input stream.</p>
1455+ * Sends write request to database. Input data is read from the input stream.
14771456 *
14781457 * @param tableName - destination table name
14791458 * @param data - data stream to insert
@@ -1482,7 +1461,49 @@ public CompletableFuture<InsertResponse> insert(String tableName, InputStream da
14821461 * @return {@code CompletableFuture<InsertResponse>} - a promise to insert response
14831462 */
14841463 public CompletableFuture <InsertResponse > insert (String tableName ,
1485- InputStream data ,
1464+ InputStream data ,
1465+ ClickHouseFormat format ,
1466+ InsertSettings settings ) {
1467+
1468+ final int writeBufferSize = settings .getInputStreamCopyBufferSize () <= 0 ?
1469+ Integer .parseInt (configuration .getOrDefault (ClientConfigProperties .CLIENT_NETWORK_BUFFER_SIZE .getKey (),
1470+ ClientConfigProperties .CLIENT_NETWORK_BUFFER_SIZE .getDefaultValue ())) :
1471+ settings .getInputStreamCopyBufferSize ();
1472+
1473+ if (writeBufferSize <= 0 ) {
1474+ throw new IllegalArgumentException ("Buffer size must be greater than 0" );
1475+ }
1476+
1477+ return insert (tableName , new DataStreamWriter () {
1478+ @ Override
1479+ public void onOutput (OutputStream out ) throws IOException {
1480+ byte [] buffer = new byte [writeBufferSize ];
1481+ int bytesRead ;
1482+ while ((bytesRead = data .read (buffer )) > 0 ) {
1483+ out .write (buffer , 0 , bytesRead );
1484+ }
1485+ out .close ();
1486+ }
1487+
1488+ @ Override
1489+ public void onRetry () throws IOException {
1490+ data .reset ();
1491+ }
1492+ },
1493+ format , settings );
1494+ }
1495+
1496+ /**
1497+ * Does an insert request to a server. Data is pushed when a {@link DataStreamWriter#onOutput(OutputStream)} is called.
1498+ *
1499+ * @param tableName - target table name
1500+ * @param writer - {@link DataStreamWriter} implementation
1501+ * @param format - source format
1502+ * @param settings - operation settings
1503+ * @return {@code CompletableFuture<InsertResponse>} - a promise to insert response
1504+ */
1505+ public CompletableFuture <InsertResponse > insert (String tableName ,
1506+ DataStreamWriter writer ,
14861507 ClickHouseFormat format ,
14871508 InsertSettings settings ) {
14881509
@@ -1513,6 +1534,8 @@ public CompletableFuture<InsertResponse> insert(String tableName,
15131534
15141535 settings .setOption (ClientConfigProperties .INPUT_OUTPUT_FORMAT .getKey (), format .name ());
15151536 final InsertSettings finalSettings = settings ;
1537+ final String sqlStmt = "INSERT INTO \" " + tableName + "\" FORMAT " + format .name ();
1538+ finalSettings .serverSetting (ClickHouseHttpProto .QPARAM_QUERY_STMT , sqlStmt );
15161539 responseSupplier = () -> {
15171540 // Selecting some node
15181541 ClickHouseNode selectedNode = getNextAliveNode ();
@@ -1523,17 +1546,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
15231546 try (ClassicHttpResponse httpResponse =
15241547 httpClientHelper .executeRequest (selectedNode , finalSettings .getAllSettings (),
15251548 out -> {
1526- out .write ("INSERT INTO " .getBytes ());
1527- out .write (tableName .getBytes ());
1528- out .write (" FORMAT " .getBytes ());
1529- out .write (format .name ().getBytes ());
1530- out .write (" \n " .getBytes ());
1531-
1532- byte [] buffer = new byte [writeBufferSize ];
1533- int bytesRead ;
1534- while ((bytesRead = data .read (buffer )) > 0 ) {
1535- out .write (buffer , 0 , bytesRead );
1536- }
1549+ writer .onOutput (out );
15371550 out .close ();
15381551 })) {
15391552
@@ -1566,7 +1579,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
15661579
15671580 if (i < maxRetries ) {
15681581 try {
1569- data . reset ();
1582+ writer . onRetry ();
15701583 } catch (IOException ioe ) {
15711584 throw new ClientException ("Failed to reset stream before next attempt" , ioe );
15721585 }
@@ -1581,12 +1594,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
15811594
15821595 CompletableFuture <ClickHouseResponse > future = null ;
15831596 future = request .data (output -> {
1584- //Copy the data from the input stream to the output stream
1585- byte [] buffer = new byte [settings .getInputStreamCopyBufferSize ()];
1586- int bytesRead ;
1587- while ((bytesRead = data .read (buffer )) != -1 ) {
1588- output .write (buffer , 0 , bytesRead );
1589- }
1597+ writer .onOutput (output );
15901598 output .close ();
15911599 }).option (ClickHouseClientOption .ASYNC , false ).execute ();
15921600
0 commit comments