Skip to content

Commit 4273a6e

Browse files
authored
Expose summary in request (#397)
1 parent cd46772 commit 4273a6e

File tree

11 files changed

+393
-11
lines changed

11 files changed

+393
-11
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ clickhouse-macros = { version = "0.3.0", path = "macros" }
126126
clickhouse-types = { version = "0.1.2", path = "types" }
127127

128128
serde = { version = "1.0.106", features = ["derive"] }
129+
serde_json = "1"
129130

130131
thiserror = "2.0"
131132
bytes = { version = "1.5.0", features = ["serde"] }

src/cursors/bytes.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{cursors::RawCursor, error::Result, response::Response};
1+
use crate::{cursors::RawCursor, error::Result, query_summary::QuerySummary, response::Response};
22
use bytes::{Buf, Bytes, BytesMut};
33
use std::{
44
io::Result as IoResult,
@@ -123,6 +123,16 @@ impl BytesCursor {
123123
pub fn decoded_bytes(&self) -> u64 {
124124
self.raw.decoded_bytes()
125125
}
126+
127+
/// Returns the parsed `X-ClickHouse-Summary` response header, if
128+
/// present. Available once the response headers have been received.
129+
///
130+
/// Note: the summary values may be incomplete unless the query was
131+
/// executed with `wait_end_of_query=1`.
132+
#[inline]
133+
pub fn summary(&self) -> Option<&QuerySummary> {
134+
self.raw.summary()
135+
}
126136
}
127137

128138
impl AsyncRead for BytesCursor {

src/cursors/raw.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::{
22
error::Result,
3+
query_summary::QuerySummary,
34
response::{Chunks, Response, ResponseFuture},
45
};
56
use bytes::Bytes;
@@ -20,6 +21,7 @@ enum RawCursorState {
2021

2122
struct RawCursorLoading {
2223
chunks: Chunks,
24+
summary: Option<Box<QuerySummary>>,
2325
net_size: u64,
2426
data_size: u64,
2527
}
@@ -63,10 +65,15 @@ impl RawCursor {
6365
// in order to provide proper fused behavior of the cursor.
6466
let res = ready!(future.as_mut().poll(cx));
6567
let mut chunks = Chunks::empty();
66-
let res = res.map(|c| chunks = c);
68+
let mut summary = None;
69+
let res = res.map(|(c, s)| {
70+
chunks = c;
71+
summary = s;
72+
});
6773

6874
self.0 = RawCursorState::Loading(RawCursorLoading {
6975
chunks,
76+
summary,
7077
net_size: 0,
7178
data_size: 0,
7279
});
@@ -88,6 +95,13 @@ impl RawCursor {
8895
}
8996
}
9097

98+
pub(crate) fn summary(&self) -> Option<&QuerySummary> {
99+
match &self.0 {
100+
RawCursorState::Loading(state) => state.summary.as_deref(),
101+
RawCursorState::Waiting(_) => None,
102+
}
103+
}
104+
91105
#[cfg(feature = "futures03")]
92106
pub(crate) fn is_terminated(&self) -> bool {
93107
match &self.0 {

src/cursors/row.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::{
66
bytes_ext::BytesExt,
77
cursors::RawCursor,
88
error::{Error, Result},
9+
query_summary::QuerySummary,
910
response::Response,
1011
rowbinary,
1112
};
@@ -159,6 +160,16 @@ impl<T> RowCursor<T> {
159160
pub fn decoded_bytes(&self) -> u64 {
160161
self.raw.decoded_bytes()
161162
}
163+
164+
/// Returns the parsed `X-ClickHouse-Summary` response header, if
165+
/// present. Available once the response headers have been received.
166+
///
167+
/// Note: the summary values may be incomplete unless the query was
168+
/// executed with `wait_end_of_query=1`.
169+
#[inline]
170+
pub fn summary(&self) -> Option<&QuerySummary> {
171+
self.raw.summary()
172+
}
162173
}
163174

164175
#[cfg(feature = "futures03")]

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
pub use self::{
55
compression::Compression,
6+
query_summary::QuerySummary,
67
row::{Row, RowOwned, RowRead, RowWrite},
78
};
89
use self::{error::Result, http_client::HttpClient};
@@ -35,6 +36,7 @@ mod compression;
3536
mod cursors;
3637
mod headers;
3738
mod http_client;
39+
mod query_summary;
3840
mod request_body;
3941
mod response;
4042
mod row;

src/query_summary.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
use std::collections::HashMap;
2+
3+
/// Parsed representation of the `X-ClickHouse-Summary` HTTP response header.
4+
///
5+
/// Provides typed getters for known fields and a generic [`get`](Self::get)
6+
/// fallback for forward-compatibility with future ClickHouse versions.
7+
///
8+
/// All getters return `Option<u64>`: `None` if the field is absent or cannot
9+
/// be parsed. This ensures deserialization never fails, even if ClickHouse
10+
/// renames, removes, or adds fields.
11+
///
12+
/// Note: the summary values may be incomplete unless the query was executed
13+
/// with `wait_end_of_query=1`, because ClickHouse sends this header before
14+
/// the response body and the values reflect progress at that point.
15+
#[derive(Debug, Clone)]
16+
pub struct QuerySummary {
17+
fields: HashMap<String, String>,
18+
}
19+
20+
impl QuerySummary {
21+
/// Returns the raw string value for the given key, if present.
22+
///
23+
/// Use this to access fields that are not yet covered by typed getters,
24+
/// e.g. fields added in newer ClickHouse versions.
25+
pub fn get(&self, key: &str) -> Option<&str> {
26+
self.fields.get(key).map(String::as_str)
27+
}
28+
29+
pub fn read_rows(&self) -> Option<u64> {
30+
self.get_u64("read_rows")
31+
}
32+
33+
pub fn read_bytes(&self) -> Option<u64> {
34+
self.get_u64("read_bytes")
35+
}
36+
37+
pub fn written_rows(&self) -> Option<u64> {
38+
self.get_u64("written_rows")
39+
}
40+
41+
pub fn written_bytes(&self) -> Option<u64> {
42+
self.get_u64("written_bytes")
43+
}
44+
45+
pub fn total_rows_to_read(&self) -> Option<u64> {
46+
self.get_u64("total_rows_to_read")
47+
}
48+
49+
pub fn result_rows(&self) -> Option<u64> {
50+
self.get_u64("result_rows")
51+
}
52+
53+
pub fn result_bytes(&self) -> Option<u64> {
54+
self.get_u64("result_bytes")
55+
}
56+
57+
pub fn elapsed_ns(&self) -> Option<u64> {
58+
self.get_u64("elapsed_ns")
59+
}
60+
61+
pub fn memory_usage(&self) -> Option<u64> {
62+
self.get_u64("memory_usage")
63+
}
64+
65+
fn get_u64(&self, key: &str) -> Option<u64> {
66+
self.fields.get(key)?.parse().ok()
67+
}
68+
69+
/// Parses the raw header value into a `QuerySummary`.
70+
///
71+
/// Returns `None` if the value is not valid JSON or not an object with
72+
/// string values. This matches ClickHouse's encoding, where all values
73+
/// are JSON strings (e.g. `"1000"` instead of `1000`).
74+
pub(crate) fn from_header(raw: &str) -> Option<Self> {
75+
let fields: HashMap<String, String> = serde_json::from_str(raw).ok()?;
76+
Some(Self { fields })
77+
}
78+
}

src/response.rs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::compression::lz4::Lz4Decoder;
1818
use crate::{
1919
compression::Compression,
2020
error::{Error, Result},
21+
query_summary::QuerySummary,
2122
};
2223

2324
// === Response ===
@@ -30,7 +31,8 @@ pub(crate) enum Response {
3031
Loading(Chunks),
3132
}
3233

33-
pub(crate) type ResponseFuture = Pin<Box<dyn Future<Output = Result<Chunks>> + Send>>;
34+
pub(crate) type ResponseFuture =
35+
Pin<Box<dyn Future<Output = Result<(Chunks, Option<Box<QuerySummary>>)>> + Send>>;
3436

3537
impl Response {
3638
pub(crate) fn new(response: HyperResponseFuture, compression: Compression) -> Self {
@@ -46,9 +48,16 @@ impl Response {
4648
.get("X-ClickHouse-Exception-Tag")
4749
.map(|value| value.as_bytes().into());
4850

51+
let summary = response
52+
.headers()
53+
.get("X-ClickHouse-Summary")
54+
.and_then(|v| v.to_str().ok())
55+
.and_then(QuerySummary::from_header)
56+
.map(Box::new);
57+
4958
// More likely to be successful, start streaming.
5059
// It still can fail, but we'll handle it in `DetectDbException`.
51-
Ok(Chunks::new(response.into_body(), compression, tag))
60+
Ok((Chunks::new(response.into_body(), compression, tag), summary))
5261
} else {
5362
// An instantly failed request.
5463
Err(collect_bad_response(
@@ -74,7 +83,10 @@ impl Response {
7483
pub(crate) async fn finish(&mut self) -> Result<()> {
7584
let chunks = loop {
7685
match self {
77-
Self::Waiting(future) => *self = Self::Loading(future.await?),
86+
Self::Waiting(future) => {
87+
let (chunks, _summary) = future.await?;
88+
*self = Self::Loading(chunks);
89+
}
7890
Self::Loading(chunks) => break chunks,
7991
}
8092
};
@@ -157,7 +169,9 @@ pub(crate) struct Chunk {
157169

158170
// * Uses `Option<_>` to make this stream fused.
159171
// * Uses `Box<_>` in order to reduce the size of cursors.
160-
pub(crate) struct Chunks(Option<Box<DetectDbException<Decompress<IncomingStream>>>>);
172+
pub(crate) struct Chunks {
173+
inner: Option<Box<DetectDbException<Decompress<IncomingStream>>>>,
174+
}
161175

162176
impl Chunks {
163177
fn new(stream: Incoming, compression: Compression, exception_tag: Option<Box<[u8]>>) -> Self {
@@ -167,16 +181,18 @@ impl Chunks {
167181
stream,
168182
exception_tag,
169183
};
170-
Self(Some(Box::new(stream)))
184+
Self {
185+
inner: Some(Box::new(stream)),
186+
}
171187
}
172188

173189
pub(crate) fn empty() -> Self {
174-
Self(None)
190+
Self { inner: None }
175191
}
176192

177193
#[cfg(feature = "futures03")]
178194
pub(crate) fn is_terminated(&self) -> bool {
179-
self.0.is_none()
195+
self.inner.is_none()
180196
}
181197
}
182198

@@ -185,11 +201,11 @@ impl Stream for Chunks {
185201

186202
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
187203
// We use `take()` to make the stream fused, including the case of panics.
188-
if let Some(mut stream) = self.0.take() {
204+
if let Some(mut stream) = self.inner.take() {
189205
let res = Pin::new(&mut stream).poll_next(cx);
190206

191207
if matches!(res, Poll::Pending | Poll::Ready(Some(Ok(_)))) {
192-
self.0 = Some(stream);
208+
self.inner = Some(stream);
193209
}
194210

195211
res

src/test/handlers.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,26 @@ where
6060
Thunk(Response::new(buffer.into()))
6161
}
6262

63+
// === provide_with_summary ===
64+
65+
/// Like [`provide`], but includes an `X-ClickHouse-Summary` response header.
66+
#[track_caller]
67+
pub fn provide_with_summary<T>(rows: impl IntoIterator<Item = T>, summary: &str) -> impl Handler
68+
where
69+
T: Serialize + Row,
70+
{
71+
let mut buffer = Vec::with_capacity(BUFFER_INITIAL_CAPACITY);
72+
for row in rows {
73+
rowbinary::serialize_row_binary(&mut buffer, &row).expect("failed to serialize");
74+
}
75+
Thunk(
76+
Response::builder()
77+
.header("X-ClickHouse-Summary", summary)
78+
.body(Bytes::from(buffer))
79+
.expect("invalid builder"),
80+
)
81+
}
82+
6383
// === record ===
6484

6585
struct RecordHandler<T>(PhantomData<T>);

tests/it/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ mod mock;
264264
mod nested;
265265
mod query;
266266
mod query_readonly;
267+
mod query_summary;
267268
mod query_syntax;
268269
mod rbwnat_header;
269270
mod rbwnat_smoke;

0 commit comments

Comments
 (0)