2525import com .clickhouse .client .api .query .QuerySettings ;
2626import com .clickhouse .data .ClickHouseFormat ;
2727import com .clickhouse .data .ClickHouseVersion ;
28+ import com .clickhouse .data .format .BinaryStreamUtils ;
29+ import net .jpountz .lz4 .LZ4Compressor ;
30+ import net .jpountz .lz4 .LZ4SafeDecompressor ;
31+ import org .apache .commons .compress .compressors .lz4 .FramedLZ4CompressorOutputStream ;
32+ import org .apache .commons .compress .compressors .snappy .SnappyCompressorOutputStream ;
2833import org .apache .commons .lang3 .StringEscapeUtils ;
2934import org .testcontainers .shaded .org .apache .commons .lang3 .RandomStringUtils ;
3035import org .testng .Assert ;
@@ -556,8 +561,8 @@ public void testCollectionInsert() throws Exception {
556561 System .setProperty ("org.slf4j.simpleLogger.defaultLogLevel" , "DEBUG" );
557562 }
558563
559- @ Test
560- public void testAppCompression () throws Exception {
564+ @ Test ( dataProvider = "testAppCompressionDataProvider" , dataProviderClass = InsertTests . class )
565+ public void testAppCompression (String algo ) throws Exception {
561566 String tableName = "very_long_table_name_with_uuid_" + UUID .randomUUID ().toString ().replace ('-' , '_' );
562567 String tableCreate = "CREATE TABLE \" " + tableName + "\" " +
563568 " (name String, " +
@@ -581,17 +586,31 @@ public void testAppCompression() throws Exception {
581586 byte [][] compressedData = new byte [data .length ][];
582587 for (int i = 0 ; i < data .length ; i ++) {
583588 ByteArrayOutputStream baos = new ByteArrayOutputStream ();
584- GZIPOutputStream gz = new GZIPOutputStream (baos );
585- gz .write (data [i ].getBytes (StandardCharsets .UTF_8 ));
586- gz .finish ();
589+ if (algo .equalsIgnoreCase ("gzip" )) {
590+ GZIPOutputStream gz = new GZIPOutputStream (baos );
591+ gz .write (data [i ].getBytes (StandardCharsets .UTF_8 ));
592+ gz .finish ();
593+ } else if (algo .equalsIgnoreCase ("lz4" )) {
594+ FramedLZ4CompressorOutputStream lz4 = new FramedLZ4CompressorOutputStream (baos );
595+ lz4 .write (data [i ].getBytes (StandardCharsets .UTF_8 ));
596+ lz4 .finish ();
597+ } else if (algo .equalsIgnoreCase ("snappy" )) {
598+ byte bytes [] = data [i ].getBytes (StandardCharsets .UTF_8 );
599+
600+ SnappyCompressorOutputStream snappy = new SnappyCompressorOutputStream (baos ,bytes .length , 32 );
601+ snappy .write (bytes );
602+ snappy .finish ();
603+ }
587604 System .out .println ("Compressed size " + baos .size () + ", uncompressed size: " + data [i ].length ());
588605 compressedData [i ] = baos .toByteArray ();
589606 }
590607
591608 InsertSettings insertSettings = new InsertSettings ()
592- .appCompressedData (true , "gzip" );
609+ .appCompressedData (true , algo );
593610 try (InsertResponse response = client .insert (tableName , out -> {
594611 for (byte [] row : compressedData ) {
612+ // if (algo.)
613+ BinaryStreamUtils .writeVarInt (out , row .length );
595614 out .write (row );
596615 }
597616 }, ClickHouseFormat .JSONEachRow , insertSettings ).get ()) {
@@ -605,6 +624,15 @@ public void testAppCompression() throws Exception {
605624 }
606625 }
607626
627+ @ DataProvider (name = "testAppCompressionDataProvider" )
628+ public static Object [][] testAppCompressionDataProvider () {
629+ return new Object [][] {
630+ {"gzip" },
631+ {"lz4" },
632+ {"snappy" },
633+ };
634+ }
635+
608636 protected void initTable (String tableName , String createTableSQL ) throws Exception {
609637 client .execute ("DROP TABLE IF EXISTS " + tableName ).get (EXECUTE_CMD_TIMEOUT , TimeUnit .SECONDS );
610638 client .execute (createTableSQL ).get (EXECUTE_CMD_TIMEOUT , TimeUnit .SECONDS );
0 commit comments