Skip to content

Commit d26c42e

Browse files
committed
fixup after rebase
1 parent ca9eae4 commit d26c42e

File tree

7 files changed

+42
-58
lines changed

7 files changed

+42
-58
lines changed

benches/common_select.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,14 @@ impl BenchmarkOpts {
168168
Compression::None,
169169
#[cfg(feature = "lz4")]
170170
Compression::Lz4,
171+
#[cfg(feature = "zstd")]
172+
Compression::Zstd(-4),
173+
#[cfg(feature = "zstd")]
174+
Compression::Zstd(-1),
175+
#[cfg(feature = "zstd")]
176+
Compression::Zstd(1),
177+
#[cfg(feature = "zstd")]
178+
Compression::Zstd(zstd::DEFAULT_COMPRESSION_LEVEL),
171179
];
172180

173181
let validation_modes = [false, true];

benches/select_numbers.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,4 @@ async fn main() {
2929
for opts in BenchmarkOpts::permutations() {
3030
bench(opts).await;
3131
}
32-
#[cfg(feature = "zstd")]
33-
for level in [-4, -1, 1, zstd::DEFAULT_COMPRESSION_LEVEL] {
34-
bench(Compression::Zstd(level), false).await;
35-
bench(Compression::Zstd(level), true).await;
36-
}
3732
}

src/insert_formatted.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,8 @@ impl InsertFormatted {
158158
db.operation.name = "INSERT",
159159
db.collection.name = collection_name,
160160
// ClickHouse-specific extension fields
161-
clickhouse.request.session_id = client.get_option(settings::SESSION_ID),
162-
clickhouse.request.query_id = client.get_option(settings::QUERY_ID),
161+
clickhouse.request.session_id = client.get_setting(settings::SESSION_ID),
162+
clickhouse.request.query_id = client.get_setting(settings::QUERY_ID),
163163
clickhouse.request.sent_rows = tracing::field::Empty,
164164
clickhouse.request.sent_bytes = tracing::field::Empty,
165165
clickhouse.request.encoded_bytes = tracing::field::Empty,
@@ -299,15 +299,11 @@ impl InsertFormatted {
299299

300300
#[cfg(any(feature = "lz4", feature = "zstd"))]
301301
let data = if self.compression.is_enabled() {
302-
CompressedData::from_slice(&data)
303-
.compressed
302+
CompressedData::new(&data, self.compression)?.compressed
304303
} else {
305304
data
306305
};
307306

308-
#[cfg(not(feature = "lz4"))]
309-
let original_size = to_u64_saturating(data.len());
310-
311307
self.send_inner(data, original_size).await
312308
}
313309

@@ -785,19 +781,23 @@ mod compression {
785781
/// # Errors
786782
/// Returns [`Error::Compression`] if `compression` is [`Compression::None`].
787783
pub fn new(data: &[u8], compression: Compression) -> Result<Self> {
784+
let original_size = to_u64_saturating(data.len());
785+
788786
match compression {
789787
Compression::None => Err(Error::Compression(
790788
"cannot pre-compress data when compression is disabled".into(),
791789
)),
792790
#[cfg(feature = "lz4")]
793791
#[allow(deprecated)]
794-
Compression::Lz4 | Compression::Lz4Hc(_) => {
795-
Ok(Self(crate::compression::lz4::compress(data)?))
796-
}
792+
Compression::Lz4 | Compression::Lz4Hc(_) => Ok(Self {
793+
compressed: crate::compression::lz4::compress(data)?,
794+
original_size,
795+
}),
797796
#[cfg(feature = "zstd")]
798-
Compression::Zstd(level) => {
799-
Ok(Self(crate::compression::zstd::compress(data, Some(level))?))
800-
}
797+
Compression::Zstd(level) => Ok(Self {
798+
compressed: crate::compression::zstd::compress(data, Some(level))?,
799+
original_size,
800+
}),
801801
}
802802
}
803803

src/query.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,8 @@ impl Query {
194194
db.response.status_code = tracing::field::Empty,
195195
db.response.returned_rows = tracing::field::Empty,
196196
// ClickHouse-specific extension fields
197-
clickhouse.request.session_id = self.client.get_option(settings::SESSION_ID),
198-
clickhouse.request.query_id = self.client.get_option(settings::QUERY_ID),
197+
clickhouse.request.session_id = self.client.get_setting(settings::SESSION_ID),
198+
clickhouse.request.query_id = self.client.get_setting(settings::QUERY_ID),
199199
clickhouse.response.received_bytes = tracing::field::Empty,
200200
clickhouse.response.decoded_bytes = tracing::field::Empty,
201201
clickhouse.response.format = response_format,

src/response.rs

Lines changed: 10 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,10 @@ impl Response {
6262
pub(crate) async fn finish(&mut self) -> Result<()> {
6363
let chunks = loop {
6464
match self {
65-
Self::Waiting(future) => *self = Self::Loading(future.await?),
65+
Self::Waiting(future) => {
66+
let (chunks, _summary) = future.await?;
67+
*self = Self::Loading(chunks);
68+
}
6669
Self::Loading(chunks) => break chunks,
6770
}
6871
};
@@ -75,7 +78,7 @@ impl Response {
7578
async fn collect_response(
7679
response: HyperResponseFuture,
7780
compression: Compression,
78-
) -> Result<Chunks> {
81+
) -> Result<(Chunks, Option<Box<QuerySummary>>)> {
7982
let response = response.await?;
8083

8184
let status = response.status();
@@ -94,11 +97,11 @@ async fn collect_response(
9497
.map(|value| value.as_bytes().into());
9598

9699
let summary = response
97-
.headers()
98-
.get("X-ClickHouse-Summary")
99-
.and_then(|v| v.to_str().ok())
100-
.and_then(QuerySummary::from_header)
101-
.map(Box::new);// More likely to be successful, start streaming.
100+
.headers()
101+
.get("X-ClickHouse-Summary")
102+
.and_then(|v| v.to_str().ok())
103+
.and_then(QuerySummary::from_header)
104+
.map(Box::new); // More likely to be successful, start streaming.
102105
// It still can fail, but we'll handle it in `DetectDbException`.
103106
Ok((Chunks::new(response.into_body(), compression, tag), summary))
104107
} else {
@@ -117,28 +120,6 @@ async fn collect_response(
117120

118121
Err(error)
119122
}
120-
121-
pub(crate) fn into_future(self) -> ResponseFuture {
122-
match self {
123-
Self::Waiting(future) => future,
124-
Self::Loading(_) => panic!("response is already streaming"),
125-
}
126-
}
127-
128-
pub(crate) async fn finish(&mut self) -> Result<()> {
129-
let chunks = loop {
130-
match self {
131-
Self::Waiting(future) => {
132-
let (chunks, _summary) = future.await?;
133-
*self = Self::Loading(chunks);
134-
}
135-
Self::Loading(chunks) => break chunks,
136-
}
137-
};
138-
139-
while chunks.try_next().await?.is_some() {}
140-
Ok(())
141-
}
142123
}
143124

144125
#[cold]

tests/it/opentelemetry.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ async fn query_with_opentelemetry() {
3131

3232
let _numbers = client
3333
.query("SELECT * FROM system.numbers LIMIT 10")
34-
.with_option("query_id", query_id.clone())
34+
.with_setting("query_id", query_id.clone())
3535
.fetch_all::<u64>()
3636
.instrument(span)
3737
.await
@@ -90,7 +90,7 @@ async fn insert_with_opentelemetry() {
9090
.insert::<FooRow>("foo")
9191
.await
9292
.unwrap()
93-
.with_option("query_id", &query_id);
93+
.with_setting("query_id", &query_id);
9494

9595
for i in 0..10 {
9696
insert

tests/it/query_summary.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ async fn summary_with_wait_end_of_query() {
1313

1414
let mut cursor = client
1515
.query("SELECT * FROM test")
16-
.with_option("send_progress_in_http_headers", "1")
17-
.with_option("wait_end_of_query", "1")
16+
.with_setting("send_progress_in_http_headers", "1")
17+
.with_setting("wait_end_of_query", "1")
1818
.fetch::<SimpleRow>()
1919
.unwrap();
2020

@@ -44,7 +44,7 @@ async fn summary_without_wait_end_of_query() {
4444

4545
let mut cursor = client
4646
.query("SELECT * FROM test")
47-
.with_option("send_progress_in_http_headers", "1")
47+
.with_setting("send_progress_in_http_headers", "1")
4848
.fetch::<SimpleRow>()
4949
.unwrap();
5050

@@ -84,8 +84,8 @@ async fn summary_generic_get() {
8484

8585
let mut cursor = client
8686
.query("SELECT * FROM test")
87-
.with_option("send_progress_in_http_headers", "1")
88-
.with_option("wait_end_of_query", "1")
87+
.with_setting("send_progress_in_http_headers", "1")
88+
.with_setting("wait_end_of_query", "1")
8989
.fetch::<SimpleRow>()
9090
.unwrap();
9191

@@ -112,8 +112,8 @@ async fn summary_with_fetch_bytes() {
112112

113113
let mut cursor = client
114114
.query("SELECT * FROM test")
115-
.with_option("send_progress_in_http_headers", "1")
116-
.with_option("wait_end_of_query", "1")
115+
.with_setting("send_progress_in_http_headers", "1")
116+
.with_setting("wait_end_of_query", "1")
117117
.fetch_bytes("RowBinaryWithNamesAndTypes")
118118
.unwrap();
119119

0 commit comments

Comments
 (0)