@@ -1204,14 +1204,17 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
1204
1204
settings = new InsertSettings ();
1205
1205
}
1206
1206
1207
+ final InsertSettings requestSettings = new InsertSettings (buildRequestSettings (settings .getAllSettings ()));
1208
+
1207
1209
String operationId = registerOperationMetrics ();
1208
- settings .setOperationId (operationId );
1210
+ requestSettings .setOperationId (operationId );
1209
1211
globalClientStats .get (operationId ).start (ClientMetrics .OP_DURATION );
1210
1212
globalClientStats .get (operationId ).start (ClientMetrics .OP_SERIALIZATION );
1211
1213
1212
1214
1213
1215
boolean hasDefaults = this .tableSchemaHasDefaults .get (tableName );
1214
1216
ClickHouseFormat format = hasDefaults ? ClickHouseFormat .RowBinaryWithDefaults : ClickHouseFormat .RowBinary ;
1217
+ requestSettings .setOption (ClientConfigProperties .INPUT_OUTPUT_FORMAT .getKey (), format );
1215
1218
TableSchema tableSchema = tableSchemaCache .get (tableName );
1216
1219
if (tableSchema == null ) {
1217
1220
throw new IllegalArgumentException ("Table schema not found for table: " + tableName + ". Did you forget to register it?" );
@@ -1235,8 +1238,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
1235
1238
Integer retry = (Integer ) configuration .get (ClientConfigProperties .RETRY_ON_FAILURE .getKey ());
1236
1239
final int maxRetries = retry == null ? 0 : retry ;
1237
1240
1238
- settings .setOption (ClientConfigProperties .INPUT_OUTPUT_FORMAT .getKey (), format );
1239
- final InsertSettings finalSettings = new InsertSettings (buildRequestSettings (settings .getAllSettings ()));
1241
+ requestSettings .setOption (ClientConfigProperties .INPUT_OUTPUT_FORMAT .getKey (), format );
1240
1242
Supplier <InsertResponse > supplier = () -> {
1241
1243
long startTime = System .nanoTime ();
1242
1244
// Selecting some node
@@ -1246,7 +1248,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
1246
1248
for (int i = 0 ; i <= maxRetries ; i ++) {
1247
1249
// Execute request
1248
1250
try (ClassicHttpResponse httpResponse =
1249
- httpClientHelper .executeRequest (selectedEndpoint , finalSettings .getAllSettings (), lz4Factory ,
1251
+ httpClientHelper .executeRequest (selectedEndpoint , requestSettings .getAllSettings (), lz4Factory ,
1250
1252
out -> {
1251
1253
out .write ("INSERT INTO " .getBytes ());
1252
1254
out .write (tableName .getBytes ());
@@ -1278,14 +1280,14 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
1278
1280
OperationMetrics metrics = new OperationMetrics (clientStats );
1279
1281
String summary = HttpAPIClientHelper .getHeaderVal (httpResponse .getFirstHeader (ClickHouseHttpProto .HEADER_SRV_SUMMARY ), "{}" );
1280
1282
ProcessParser .parseSummary (summary , metrics );
1281
- String queryId = HttpAPIClientHelper .getHeaderVal (httpResponse .getFirstHeader (ClickHouseHttpProto .HEADER_QUERY_ID ), finalSettings .getQueryId (), String ::valueOf );
1283
+ String queryId = HttpAPIClientHelper .getHeaderVal (httpResponse .getFirstHeader (ClickHouseHttpProto .HEADER_QUERY_ID ), requestSettings .getQueryId (), String ::valueOf );
1282
1284
metrics .operationComplete ();
1283
1285
metrics .setQueryId (queryId );
1284
1286
return new InsertResponse (metrics );
1285
1287
} catch (Exception e ) {
1286
1288
lastException = httpClientHelper .wrapException (String .format ("Query request failed (Attempt: %s/%s - Duration: %s)" ,
1287
1289
(i + 1 ), (maxRetries + 1 ), System .nanoTime () - startTime ), e );
1288
- if (httpClientHelper .shouldRetry (e , finalSettings .getAllSettings ())) {
1290
+ if (httpClientHelper .shouldRetry (e , requestSettings .getAllSettings ())) {
1289
1291
LOG .warn ("Retrying." , e );
1290
1292
selectedEndpoint = getNextAliveNode ();
1291
1293
} else {
@@ -1296,7 +1298,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
1296
1298
throw new ClientException ("Insert request failed after attempts: " + (maxRetries + 1 ) + " - Duration: " + (System .nanoTime () - startTime ), lastException );
1297
1299
};
1298
1300
1299
- return runAsyncOperation (supplier , settings .getAllSettings ());
1301
+ return runAsyncOperation (supplier , requestSettings .getAllSettings ());
1300
1302
1301
1303
}
1302
1304
@@ -1415,8 +1417,13 @@ public CompletableFuture<InsertResponse> insert(String tableName,
1415
1417
DataStreamWriter writer ,
1416
1418
ClickHouseFormat format ,
1417
1419
InsertSettings settings ) {
1420
+ if (settings == null ) {
1421
+ throw new IllegalArgumentException ("Settings cannot be null" );
1422
+ }
1423
+ final InsertSettings requestSettings = new InsertSettings (buildRequestSettings (settings .getAllSettings ()));
1424
+ requestSettings .setOption (ClientConfigProperties .INPUT_OUTPUT_FORMAT .getKey (), format );
1418
1425
1419
- String operationId = settings .getOperationId ();
1426
+ String operationId = requestSettings .getOperationId ();
1420
1427
ClientStatisticsHolder clientStats = null ;
1421
1428
if (operationId != null ) {
1422
1429
clientStats = globalClientStats .remove (operationId );
@@ -1430,17 +1437,14 @@ public CompletableFuture<InsertResponse> insert(String tableName,
1430
1437
1431
1438
Supplier <InsertResponse > responseSupplier ;
1432
1439
1433
- final int writeBufferSize = settings .getInputStreamCopyBufferSize () <= 0 ?
1440
+ final int writeBufferSize = requestSettings .getInputStreamCopyBufferSize () <= 0 ?
1434
1441
(int ) configuration .get (ClientConfigProperties .CLIENT_NETWORK_BUFFER_SIZE .getKey ()) :
1435
- settings .getInputStreamCopyBufferSize ();
1442
+ requestSettings .getInputStreamCopyBufferSize ();
1436
1443
1437
1444
if (writeBufferSize <= 0 ) {
1438
1445
throw new IllegalArgumentException ("Buffer size must be greater than 0" );
1439
1446
}
1440
1447
1441
- settings .setOption (ClientConfigProperties .INPUT_OUTPUT_FORMAT .getKey (), format );
1442
- final InsertSettings finalSettings = new InsertSettings (buildRequestSettings (settings .getAllSettings ()));
1443
-
1444
1448
StringBuilder sqlStmt = new StringBuilder ("INSERT INTO " ).append (tableName );
1445
1449
if (columnNames != null && !columnNames .isEmpty ()) {
1446
1450
sqlStmt .append (" (" );
@@ -1451,7 +1455,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
1451
1455
sqlStmt .append (")" );
1452
1456
}
1453
1457
sqlStmt .append (" FORMAT " ).append (format .name ());
1454
- finalSettings .serverSetting (ClickHouseHttpProto .QPARAM_QUERY_STMT , sqlStmt .toString ());
1458
+ requestSettings .serverSetting (ClickHouseHttpProto .QPARAM_QUERY_STMT , sqlStmt .toString ());
1455
1459
responseSupplier = () -> {
1456
1460
long startTime = System .nanoTime ();
1457
1461
// Selecting some node
@@ -1461,7 +1465,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
1461
1465
for (int i = 0 ; i <= retries ; i ++) {
1462
1466
// Execute request
1463
1467
try (ClassicHttpResponse httpResponse =
1464
- httpClientHelper .executeRequest (selectedEndpoint , finalSettings .getAllSettings (), lz4Factory ,
1468
+ httpClientHelper .executeRequest (selectedEndpoint , requestSettings .getAllSettings (), lz4Factory ,
1465
1469
out -> {
1466
1470
writer .onOutput (out );
1467
1471
out .close ();
@@ -1478,14 +1482,14 @@ public CompletableFuture<InsertResponse> insert(String tableName,
1478
1482
OperationMetrics metrics = new OperationMetrics (finalClientStats );
1479
1483
String summary = HttpAPIClientHelper .getHeaderVal (httpResponse .getFirstHeader (ClickHouseHttpProto .HEADER_SRV_SUMMARY ), "{}" );
1480
1484
ProcessParser .parseSummary (summary , metrics );
1481
- String queryId = HttpAPIClientHelper .getHeaderVal (httpResponse .getFirstHeader (ClickHouseHttpProto .HEADER_QUERY_ID ), finalSettings .getQueryId (), String ::valueOf );
1485
+ String queryId = HttpAPIClientHelper .getHeaderVal (httpResponse .getFirstHeader (ClickHouseHttpProto .HEADER_QUERY_ID ), requestSettings .getQueryId (), String ::valueOf );
1482
1486
metrics .operationComplete ();
1483
1487
metrics .setQueryId (queryId );
1484
1488
return new InsertResponse (metrics );
1485
1489
} catch (Exception e ) {
1486
1490
lastException = httpClientHelper .wrapException (String .format ("Insert failed (Attempt: %s/%s - Duration: %s)" ,
1487
1491
(i + 1 ), (retries + 1 ), System .nanoTime () - startTime ), e );
1488
- if (httpClientHelper .shouldRetry (e , finalSettings .getAllSettings ())) {
1492
+ if (httpClientHelper .shouldRetry (e , requestSettings .getAllSettings ())) {
1489
1493
LOG .warn ("Retrying." , e );
1490
1494
selectedEndpoint = getNextAliveNode ();
1491
1495
} else {
@@ -1502,10 +1506,10 @@ public CompletableFuture<InsertResponse> insert(String tableName,
1502
1506
}
1503
1507
}
1504
1508
LOG .warn ("Insert request failed after attempts: " + (retries + 1 ) + " - Duration: " + (System .nanoTime () - startTime ));
1505
- throw lastException ;
1509
+ throw ( lastException == null ? new ClientException ( "Failed to complete insert operation" ) : lastException ) ;
1506
1510
};
1507
1511
1508
- return runAsyncOperation (responseSupplier , settings .getAllSettings ());
1512
+ return runAsyncOperation (responseSupplier , requestSettings .getAllSettings ());
1509
1513
}
1510
1514
1511
1515
/**
@@ -1564,18 +1568,19 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
1564
1568
if (settings == null ) {
1565
1569
settings = new QuerySettings ();
1566
1570
}
1567
- if (settings .getFormat () == null ) {
1568
- settings .setFormat (ClickHouseFormat .RowBinaryWithNamesAndTypes );
1571
+ final QuerySettings requestSettings = new QuerySettings (buildRequestSettings (settings .getAllSettings ()));
1572
+
1573
+ if (requestSettings .getFormat () == null ) {
1574
+ requestSettings .setFormat (ClickHouseFormat .RowBinaryWithNamesAndTypes );
1569
1575
}
1570
1576
ClientStatisticsHolder clientStats = new ClientStatisticsHolder ();
1571
1577
clientStats .start (ClientMetrics .OP_DURATION );
1572
1578
1573
1579
Supplier <QueryResponse > responseSupplier ;
1574
1580
1575
1581
if (queryParams != null ) {
1576
- settings .setOption (HttpAPIClientHelper .KEY_STATEMENT_PARAMS , queryParams );
1582
+ requestSettings .setOption (HttpAPIClientHelper .KEY_STATEMENT_PARAMS , queryParams );
1577
1583
}
1578
- final QuerySettings finalSettings = new QuerySettings (buildRequestSettings (settings .getAllSettings ()));
1579
1584
responseSupplier = () -> {
1580
1585
long startTime = System .nanoTime ();
1581
1586
// Selecting some node
@@ -1585,7 +1590,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
1585
1590
ClassicHttpResponse httpResponse = null ;
1586
1591
try {
1587
1592
httpResponse =
1588
- httpClientHelper .executeRequest (selectedEndpoint , finalSettings .getAllSettings (), lz4Factory , output -> {
1593
+ httpClientHelper .executeRequest (selectedEndpoint , requestSettings .getAllSettings (), lz4Factory , output -> {
1589
1594
output .write (sqlQuery .getBytes (StandardCharsets .UTF_8 ));
1590
1595
output .close ();
1591
1596
});
@@ -1602,22 +1607,22 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
1602
1607
.getFirstHeader (ClickHouseHttpProto .HEADER_SRV_SUMMARY ), "{}" );
1603
1608
ProcessParser .parseSummary (summary , metrics );
1604
1609
String queryId = HttpAPIClientHelper .getHeaderVal (httpResponse
1605
- .getFirstHeader (ClickHouseHttpProto .HEADER_QUERY_ID ), finalSettings .getQueryId ());
1610
+ .getFirstHeader (ClickHouseHttpProto .HEADER_QUERY_ID ), requestSettings .getQueryId ());
1606
1611
metrics .setQueryId (queryId );
1607
1612
metrics .operationComplete ();
1608
1613
Header formatHeader = httpResponse .getFirstHeader (ClickHouseHttpProto .HEADER_FORMAT );
1609
- ClickHouseFormat responseFormat = finalSettings .getFormat ();
1614
+ ClickHouseFormat responseFormat = requestSettings .getFormat ();
1610
1615
if (formatHeader != null ) {
1611
1616
responseFormat = ClickHouseFormat .valueOf (formatHeader .getValue ());
1612
1617
}
1613
1618
1614
- return new QueryResponse (httpResponse , responseFormat , finalSettings , metrics );
1619
+ return new QueryResponse (httpResponse , responseFormat , requestSettings , metrics );
1615
1620
1616
1621
} catch (Exception e ) {
1617
1622
httpClientHelper .closeQuietly (httpResponse );
1618
1623
lastException = httpClientHelper .wrapException (String .format ("Query request failed (Attempt: %s/%s - Duration: %s)" ,
1619
1624
(i + 1 ), (retries + 1 ), System .nanoTime () - startTime ), e );
1620
- if (httpClientHelper .shouldRetry (e , finalSettings .getAllSettings ())) {
1625
+ if (httpClientHelper .shouldRetry (e , requestSettings .getAllSettings ())) {
1621
1626
LOG .warn ("Retrying." , e );
1622
1627
selectedEndpoint = getNextAliveNode ();
1623
1628
} else {
@@ -1626,10 +1631,10 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
1626
1631
}
1627
1632
}
1628
1633
LOG .warn ("Query request failed after attempts: " + (retries + 1 ) + " - Duration: " + (System .nanoTime () - startTime ));
1629
- throw lastException ;
1634
+ throw ( lastException == null ? new ClientException ( "Failed to complete query" ) : lastException ) ;
1630
1635
};
1631
1636
1632
- return runAsyncOperation (responseSupplier , settings .getAllSettings ());
1637
+ return runAsyncOperation (responseSupplier , requestSettings .getAllSettings ());
1633
1638
}
1634
1639
public CompletableFuture <QueryResponse > query (String sqlQuery , Map <String , Object > queryParams ) {
1635
1640
return query (sqlQuery , queryParams , null );
@@ -1674,10 +1679,11 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, Map<String, Obje
1674
1679
if (settings == null ) {
1675
1680
settings = new QuerySettings ();
1676
1681
}
1677
- settings .setFormat (ClickHouseFormat .RowBinaryWithNamesAndTypes );
1678
- settings .waitEndOfQuery (true ); // we rely on the summery
1682
+ final QuerySettings requestSettings = new QuerySettings (buildRequestSettings (settings .getAllSettings ()));
1683
+ requestSettings .setFormat (ClickHouseFormat .RowBinaryWithNamesAndTypes );
1684
+ requestSettings .waitEndOfQuery (true ); // we rely on the summery
1679
1685
1680
- return query (sqlQuery , params , settings ).thenApply (response -> {
1686
+ return query (sqlQuery , params , requestSettings ).thenApply (response -> {
1681
1687
try {
1682
1688
1683
1689
return new Records (response , newBinaryFormatReader (response ));
@@ -1708,9 +1714,11 @@ public List<GenericRecord> queryAll(String sqlQuery, Map<String, Object> params,
1708
1714
}
1709
1715
try {
1710
1716
int operationTimeout = getOperationTimeout ();
1711
- settings .setFormat (ClickHouseFormat .RowBinaryWithNamesAndTypes )
1712
- .waitEndOfQuery (true );
1713
- CompletableFuture <QueryResponse > f = query (sqlQuery , params , settings );
1717
+ final QuerySettings requestSettings = new QuerySettings (buildRequestSettings (settings .getAllSettings ()));
1718
+ requestSettings .setFormat (ClickHouseFormat .RowBinaryWithNamesAndTypes );
1719
+ requestSettings .waitEndOfQuery (true );
1720
+
1721
+ CompletableFuture <QueryResponse > f = query (sqlQuery , params , requestSettings );
1714
1722
try (QueryResponse response = operationTimeout == 0 ? f .get () : f .get (operationTimeout , TimeUnit .MILLISECONDS )) {
1715
1723
List <GenericRecord > records = new ArrayList <>();
1716
1724
if (response .getResultRows () > 0 ) {
0 commit comments