|
16 | 16 | import com.clickhouse.client.api.insert.InsertSettings; |
17 | 17 | import com.clickhouse.client.api.insert.POJOSerializer; |
18 | 18 | import com.clickhouse.client.api.insert.SerializerNotFoundException; |
| 19 | +import com.clickhouse.client.api.internal.ClientStatisticsHolder; |
19 | 20 | import com.clickhouse.client.api.internal.SerializerUtils; |
20 | 21 | import com.clickhouse.client.api.internal.SettingsConverter; |
21 | 22 | import com.clickhouse.client.api.internal.TableSchemaParser; |
22 | 23 | import com.clickhouse.client.api.internal.ValidationUtils; |
23 | 24 | import com.clickhouse.client.api.metadata.TableSchema; |
24 | | -import com.clickhouse.client.api.internal.ClientStatisticsHolder; |
25 | 25 | import com.clickhouse.client.api.metrics.ClientMetrics; |
26 | 26 | import com.clickhouse.client.api.query.GenericRecord; |
27 | 27 | import com.clickhouse.client.api.query.QueryResponse; |
|
35 | 35 | import com.clickhouse.data.format.BinaryStreamUtils; |
36 | 36 | import org.slf4j.Logger; |
37 | 37 | import org.slf4j.LoggerFactory; |
38 | | -import org.slf4j.MDC; |
39 | 38 |
|
40 | 39 | import java.io.ByteArrayInputStream; |
41 | 40 | import java.io.ByteArrayOutputStream; |
@@ -708,21 +707,19 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec |
708 | 707 | final ClickHouseFormat format = settings.getFormat(); |
709 | 708 | request.format(format); |
710 | 709 |
|
711 | | - CompletableFuture<QueryResponse> future = new CompletableFuture<>(); |
712 | 710 | final QuerySettings finalSettings = settings; |
713 | | - queryExecutor.submit(() -> { |
714 | | - MDC.put("queryId", finalSettings.getQueryId()); |
| 711 | + CompletableFuture<QueryResponse> future = CompletableFuture.supplyAsync(() -> { |
715 | 712 | LOG.trace("Executing request: {}", request); |
716 | 713 | try { |
717 | 714 | QueryResponse queryResponse = new QueryResponse(client, request.execute(), finalSettings, format, clientStats); |
718 | 715 | queryResponse.ensureDone(); |
719 | | - future.complete(queryResponse); |
| 716 | + return queryResponse; |
| 717 | + } catch (ClientException e) { |
| 718 | + throw e; |
720 | 719 | } catch (Exception e) { |
721 | | - future.completeExceptionally(e); |
722 | | - } finally { |
723 | | - MDC.remove("queryId"); |
| 720 | + throw new ClientException("Failed to get query response", e); |
724 | 721 | } |
725 | | - }); |
| 722 | + }, queryExecutor); |
726 | 723 | return future; |
727 | 724 | } |
728 | 725 |
|
@@ -764,21 +761,20 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se |
764 | 761 | final ClickHouseFormat format = settings.getFormat(); |
765 | 762 | request.format(format); |
766 | 763 |
|
767 | | - CompletableFuture<Records> future = new CompletableFuture<>(); |
768 | 764 | final QuerySettings finalSettings = settings; |
769 | | - queryExecutor.submit(() -> { |
770 | | - MDC.put("queryId", finalSettings.getQueryId()); |
| 765 | + CompletableFuture<Records> future = CompletableFuture.supplyAsync(() -> { |
771 | 766 | LOG.trace("Executing request: {}", request); |
772 | 767 | try { |
773 | 768 | QueryResponse queryResponse = new QueryResponse(client, request.execute(), finalSettings, format, clientStats); |
774 | 769 | queryResponse.ensureDone(); |
775 | | - future.complete(new Records(queryResponse, finalSettings)); |
| 770 | + return new Records(queryResponse, finalSettings); |
| 771 | + } catch (ClientException e) { |
| 772 | + throw e; |
776 | 773 | } catch (Exception e) { |
777 | | - future.completeExceptionally(e); |
778 | | - } finally { |
779 | | - MDC.remove("queryId"); |
| 774 | + throw new ClientException("Failed to get query response", e); |
780 | 775 | } |
781 | | - }); |
| 776 | + }, queryExecutor); |
| 777 | + |
782 | 778 | return future; |
783 | 779 | } |
784 | 780 |
|
|
0 commit comments