2727import com .clickhouse .data .ClickHouseFormat ;
2828import com .clickhouse .data .ClickHouseVersion ;
2929import com .clickhouse .data .format .BinaryStreamUtils ;
30+ import lombok .Data ;
3031import net .jpountz .lz4 .LZ4Compressor ;
3132import net .jpountz .lz4 .LZ4Factory ;
3233import net .jpountz .lz4 .LZ4SafeDecompressor ;
@@ -270,16 +271,16 @@ public void insertRawData() throws Exception {
270271 assertEquals (records .size (), 1000 );
271272 }
272273
273- @ Test (groups = { "integration" }, enabled = true )
274- public void insertRawDataAsync () throws Exception {
274+ @ Test (groups = { "integration" }, dataProvider = "insertRawDataAsyncProvider" , dataProviderClass = InsertTests . class )
275+ public void insertRawDataAsync (boolean async ) throws Exception {
275276 final String tableName = "raw_data_table_async" ;
276277 final String createSQL = "CREATE TABLE " + tableName +
277278 " (Id UInt32, event_ts Timestamp, name String, p1 Int64, p2 String) ENGINE = MergeTree() ORDER BY ()" ;
278279
279280 initTable (tableName , createSQL );
280281
281282 InsertSettings localSettings = new InsertSettings (settings .getAllSettings ());
282- localSettings .setOption (ClientConfigProperties .ASYNC_OPERATIONS .getKey (), true );
283+ localSettings .setOption (ClientConfigProperties .ASYNC_OPERATIONS .getKey (), async );
283284 ByteArrayOutputStream data = new ByteArrayOutputStream ();
284285 PrintWriter writer = new PrintWriter (data );
285286 for (int i = 0 ; i < 1000 ; i ++) {
@@ -293,10 +294,20 @@ public void insertRawDataAsync() throws Exception {
293294
294295 List <GenericRecord > records = client .queryAll ("SELECT * FROM " + tableName );
295296 assertEquals (records .size (), 1000 );
297+ assertTrue (Thread .currentThread ().getName ()
298+ .startsWith (async ? "ForkJoinPool.commonPool" : "main" ), "Threads starts with " + Thread .currentThread ().getName ());
296299 })
297300 .join (); // wait operation complete. only for tests
298301 }
299302
303+ @ DataProvider
304+ public static Object [][] insertRawDataAsyncProvider (){
305+ return new Object [][] {
306+ {true }, // async
307+ {false } // blocking
308+ };
309+ }
310+
300311 @ Test (groups = { "integration" }, dataProvider = "insertRawDataSimpleDataProvider" , dataProviderClass = InsertTests .class )
301312 public void insertRawDataSimple (String tableName ) throws Exception {
302313// final String tableName = "raw_data_table";
0 commit comments