Skip to content

Commit 0d86e59

Browse files
committed
Eagerly deserialize metadata
In order to support Scylla's metadata id extension we have to, when handling Response:Rows after EXECUTE, do the following: - Check if response contains "new metadata" flag - If it does, retrieve the metadata from the response - Store new metadata in PreparedStatement Doing that requires us to deserialize metadata, so we have to do that in Connection alread, instead of in "into_rows_result" as we did until now.
1 parent 0647f96 commit 0d86e59

File tree

7 files changed

+96
-73
lines changed

7 files changed

+96
-73
lines changed

scylla-cql/src/frame/frame_errors.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,8 @@ pub enum CqlResultParseError {
235235
PreparedParseError(#[from] PreparedParseError),
236236
#[error("RESULT:Rows response deserialization failed: {0}")]
237237
RawRowsParseError(#[from] RawRowsAndPagingStateResponseParseError),
238+
#[error("RESULT:Rows result metadata response deserialization failed: {0}")]
239+
ResultMetadataParseError(#[from] ResultMetadataAndRowsCountParseError),
238240
}
239241

240242
#[non_exhaustive]

scylla/src/client/pager.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ use scylla_cql::deserialize::row::{ColumnIterator, DeserializeRow};
1616
use scylla_cql::deserialize::{DeserializationError, TypeCheckError};
1717
use scylla_cql::frame::frame_errors::ResultMetadataAndRowsCountParseError;
1818
use scylla_cql::frame::request::query::PagingState;
19-
use scylla_cql::frame::response::NonErrorResponse;
20-
use scylla_cql::frame::response::result::RawMetadataAndRawRows;
19+
use scylla_cql::frame::response::NonErrorResponseWithDeserializedMetadata;
20+
use scylla_cql::frame::response::result::DeserializedMetadataAndRawRows;
2121
use scylla_cql::frame::types::SerialConsistency;
2222
use scylla_cql::serialize::row::SerializedValues;
2323
use std::result::Result;
@@ -58,7 +58,7 @@ macro_rules! ready_some_ok {
5858
}
5959

6060
struct ReceivedPage {
61-
rows: RawMetadataAndRawRows,
61+
rows: DeserializedMetadataAndRawRows,
6262
tracing_id: Option<Uuid>,
6363
request_coordinator: Option<Coordinator>,
6464
}
@@ -75,7 +75,7 @@ pub(crate) struct PreparedPagerConfig {
7575
// A separate module is used here so that the parent module cannot construct
7676
// SendAttemptedProof directly.
7777
mod checked_channel_sender {
78-
use scylla_cql::frame::response::result::RawMetadataAndRawRows;
78+
use scylla_cql::frame::response::result::DeserializedMetadataAndRawRows;
7979
use std::marker::PhantomData;
8080
use tokio::sync::mpsc;
8181
use uuid::Uuid;
@@ -119,7 +119,7 @@ mod checked_channel_sender {
119119
Result<(), mpsc::error::SendError<ResultPage>>,
120120
) {
121121
let empty_page = ReceivedPage {
122-
rows: RawMetadataAndRawRows::mock_empty(),
122+
rows: DeserializedMetadataAndRawRows::mock_empty(),
123123
tracing_id,
124124
request_coordinator,
125125
};
@@ -350,7 +350,9 @@ where
350350
match query_response {
351351
Ok(NonErrorQueryResponse {
352352
response:
353-
NonErrorResponse::Result(result::Result::Rows((rows, paging_state_response))),
353+
NonErrorResponseWithDeserializedMetadata::Result(
354+
result::ResultWithDeserializedMetadata::Rows((rows, paging_state_response)),
355+
),
354356
tracing_id,
355357
..
356358
}) => {
@@ -404,7 +406,7 @@ where
404406
Err(err)
405407
}
406408
Ok(NonErrorQueryResponse {
407-
response: NonErrorResponse::Result(_),
409+
response: NonErrorResponseWithDeserializedMetadata::Result(_),
408410
tracing_id,
409411
..
410412
}) => {
@@ -549,7 +551,9 @@ where
549551
let result = (self.fetcher)(paging_state).await?;
550552
let response = result.into_non_error_query_response()?;
551553
match response.response {
552-
NonErrorResponse::Result(result::Result::Rows((rows, paging_state_response))) => {
554+
NonErrorResponseWithDeserializedMetadata::Result(
555+
result::ResultWithDeserializedMetadata::Rows((rows, paging_state_response)),
556+
) => {
553557
let (proof, send_result) = self
554558
.sender
555559
.send(Ok(ReceivedPage {
@@ -574,7 +578,7 @@ where
574578
}
575579
}
576580
}
577-
NonErrorResponse::Result(_) => {
581+
NonErrorResponseWithDeserializedMetadata::Result(_) => {
578582
// We have most probably sent a modification statement (e.g. INSERT or UPDATE),
579583
// so let's return an empty iterator as suggested in #631.
580584

@@ -674,11 +678,7 @@ impl QueryPager {
674678

675679
let received_page = ready_some_ok!(Pin::new(&mut s.page_receiver).poll_recv(cx));
676680

677-
let raw_rows_with_deserialized_metadata =
678-
received_page.rows.deserialize_metadata().map_err(|err| {
679-
NextRowError::NextPageError(NextPageError::ResultMetadataParseError(err))
680-
})?;
681-
s.current_page = RawRowLendingIterator::new(raw_rows_with_deserialized_metadata);
681+
s.current_page = RawRowLendingIterator::new(received_page.rows);
682682

683683
if let Some(tracing_id) = received_page.tracing_id {
684684
s.tracing_ids.push(tracing_id);
@@ -1044,10 +1044,9 @@ If you are using this API, you are probably doing something wrong."
10441044
}
10451045
};
10461046
let page_received = page_received_res?;
1047-
let raw_rows_with_deserialized_metadata = page_received.rows.deserialize_metadata()?;
10481047

10491048
Ok(Self {
1050-
current_page: RawRowLendingIterator::new(raw_rows_with_deserialized_metadata),
1049+
current_page: RawRowLendingIterator::new(page_received.rows),
10511050
page_receiver: receiver,
10521051
tracing_ids: if let Some(tracing_id) = page_received.tracing_id {
10531052
vec![tracing_id]

scylla/src/client/session.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use arc_swap::ArcSwapOption;
4545
use futures::future::join_all;
4646
use futures::future::try_join_all;
4747
use itertools::Itertools;
48-
use scylla_cql::frame::response::NonErrorResponse;
48+
use scylla_cql::frame::response::NonErrorResponseWithDeserializedMetadata;
4949
use scylla_cql::serialize::batch::BatchValues;
5050
use scylla_cql::serialize::row::{SerializeRow, SerializedValues};
5151
use std::borrow::Borrow;
@@ -1136,7 +1136,9 @@ impl Session {
11361136

11371137
let response = match run_request_result {
11381138
RunRequestResult::IgnoredWriteError => NonErrorQueryResponse {
1139-
response: NonErrorResponse::Result(result::Result::Void),
1139+
response: NonErrorResponseWithDeserializedMetadata::Result(
1140+
result::ResultWithDeserializedMetadata::Void,
1141+
),
11401142
tracing_id: None,
11411143
warnings: Vec::new(),
11421144
},
@@ -1512,7 +1514,9 @@ impl Session {
15121514

15131515
let response = match run_request_result {
15141516
RunRequestResult::IgnoredWriteError => NonErrorQueryResponse {
1515-
response: NonErrorResponse::Result(result::Result::Void),
1517+
response: NonErrorResponseWithDeserializedMetadata::Result(
1518+
result::ResultWithDeserializedMetadata::Void,
1519+
),
15161520
tracing_id: None,
15171521
warnings: Vec::new(),
15181522
},

scylla/src/network/connection.rs

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@ use futures::{FutureExt, future::RemoteHandle};
3333
use scylla_cql::frame::frame_errors::CqlResponseParseError;
3434
use scylla_cql::frame::request::CqlRequestKind;
3535
use scylla_cql::frame::request::options::{self, Options};
36-
use scylla_cql::frame::response::Error;
3736
use scylla_cql::frame::response::authenticate::Authenticate;
3837
use scylla_cql::frame::response::result::{ResultMetadata, TableSpec};
3938
use scylla_cql::frame::response::{self, error};
39+
use scylla_cql::frame::response::{Error, ResponseWithDeserializedMetadata};
4040
use scylla_cql::frame::types::SerialConsistency;
4141
use scylla_cql::serialize::batch::{BatchValues, BatchValuesIterator};
4242
use scylla_cql::serialize::raw_batch::RawBatchValuesAdapter;
@@ -546,9 +546,11 @@ impl Connection {
546546
// Extract the response to STARTUP request and tidy up the errors.
547547
let response = match req_result {
548548
Ok(r) => match r.response {
549-
Response::Ready => NonErrorStartupResponse::Ready,
550-
Response::Authenticate(auth) => NonErrorStartupResponse::Authenticate(auth),
551-
Response::Error(Error { error, reason }) => {
549+
ResponseWithDeserializedMetadata::Ready => NonErrorStartupResponse::Ready,
550+
ResponseWithDeserializedMetadata::Authenticate(auth) => {
551+
NonErrorStartupResponse::Authenticate(auth)
552+
}
553+
ResponseWithDeserializedMetadata::Error(Error { error, reason }) => {
552554
return Err(err(ConnectionSetupRequestErrorKind::DbError(error, reason)));
553555
}
554556
_ => {
@@ -595,8 +597,8 @@ impl Connection {
595597
// Extract the supported options and tidy up the errors.
596598
let supported = match req_result {
597599
Ok(r) => match r.response {
598-
Response::Supported(supported) => supported,
599-
Response::Error(Error { error, reason }) => {
600+
ResponseWithDeserializedMetadata::Supported(supported) => supported,
601+
ResponseWithDeserializedMetadata::Error(Error { error, reason }) => {
600602
return Err(err(ConnectionSetupRequestErrorKind::DbError(error, reason)));
601603
}
602604
_ => {
@@ -648,10 +650,12 @@ impl Connection {
648650
.await?;
649651

650652
match query_response.response {
651-
Response::Error(error::Error { error, reason }) => {
653+
ResponseWithDeserializedMetadata::Error(error::Error { error, reason }) => {
652654
Err(RequestAttemptError::DbError(error, reason))
653655
}
654-
Response::Result(result::Result::Prepared(p)) => {
656+
ResponseWithDeserializedMetadata::Result(
657+
result::ResultWithDeserializedMetadata::Prepared(p),
658+
) => {
655659
let is_lwt = self
656660
.features
657661
.protocol_features
@@ -765,13 +769,13 @@ impl Connection {
765769
// Extract non-error response to AUTH_RESPONSE request and tidy up errors.
766770
let response = match req_result {
767771
Ok(r) => match r.response {
768-
Response::AuthSuccess(auth_success) => {
772+
ResponseWithDeserializedMetadata::AuthSuccess(auth_success) => {
769773
NonErrorAuthResponse::AuthSuccess(auth_success)
770774
}
771-
Response::AuthChallenge(auth_challenge) => {
775+
ResponseWithDeserializedMetadata::AuthChallenge(auth_challenge) => {
772776
NonErrorAuthResponse::AuthChallenge(auth_challenge)
773777
}
774-
Response::Error(Error { error, reason }) => {
778+
ResponseWithDeserializedMetadata::Error(Error { error, reason }) => {
775779
return Err(err(ConnectionSetupRequestErrorKind::DbError(error, reason)));
776780
}
777781
_ => {
@@ -951,7 +955,7 @@ impl Connection {
951955
}
952956

953957
match &query_response.response {
954-
Response::Error(frame::response::Error {
958+
ResponseWithDeserializedMetadata::Error(frame::response::Error {
955959
error: DbError::Unprepared { statement_id },
956960
..
957961
}) => {
@@ -1067,7 +1071,7 @@ impl Connection {
10671071
.map_err(RequestAttemptError::from)?;
10681072

10691073
return match query_response.response {
1070-
Response::Error(err) => match err.error {
1074+
ResponseWithDeserializedMetadata::Error(err) => match err.error {
10711075
DbError::Unprepared { statement_id } => {
10721076
debug!(
10731077
"Connection::batch: got DbError::Unprepared - repreparing statement with id {:?}",
@@ -1088,7 +1092,7 @@ impl Connection {
10881092
}
10891093
_ => Err(err.into()),
10901094
},
1091-
Response::Result(_) => Ok(query_response),
1095+
ResponseWithDeserializedMetadata::Result(_) => Ok(query_response),
10921096
_ => Err(RequestAttemptError::UnexpectedResponse(
10931097
query_response.response.to_response_kind(),
10941098
)),
@@ -1164,7 +1168,9 @@ impl Connection {
11641168
query_response: QueryResponse,
11651169
) -> Result<(), UseKeyspaceError> {
11661170
match query_response.response {
1167-
Response::Result(result::Result::SetKeyspace(set_keyspace)) => {
1171+
ResponseWithDeserializedMetadata::Result(
1172+
result::ResultWithDeserializedMetadata::SetKeyspace(set_keyspace),
1173+
) => {
11681174
if !set_keyspace
11691175
.keyspace_name
11701176
.eq_ignore_ascii_case(keyspace_name.as_str())
@@ -1180,7 +1186,7 @@ impl Connection {
11801186

11811187
Ok(())
11821188
}
1183-
Response::Error(err) => Err(UseKeyspaceError::RequestError(
1189+
ResponseWithDeserializedMetadata::Error(err) => Err(UseKeyspaceError::RequestError(
11841190
RequestAttemptError::DbError(err.error, err.reason),
11851191
)),
11861192
_ => Err(UseKeyspaceError::RequestError(
@@ -1204,8 +1210,8 @@ impl Connection {
12041210
// Extract the response and tidy up the errors.
12051211
match self.send_request(&register_frame, true, false, None).await {
12061212
Ok(r) => match r.response {
1207-
Response::Ready => Ok(()),
1208-
Response::Error(Error { error, reason }) => {
1213+
ResponseWithDeserializedMetadata::Ready => Ok(()),
1214+
ResponseWithDeserializedMetadata::Error(Error { error, reason }) => {
12091215
Err(err(ConnectionSetupRequestErrorKind::DbError(error, reason)))
12101216
}
12111217
_ => Err(err(ConnectionSetupRequestErrorKind::UnexpectedResponse(
@@ -1294,7 +1300,13 @@ impl Connection {
12941300
task_response.opcode,
12951301
body_with_ext.body,
12961302
cached_metadata,
1297-
)?;
1303+
)?
1304+
.deserialize_metadata()
1305+
.map_err(|e| {
1306+
ResponseParseError::CqlResponseParseError(CqlResponseParseError::CqlResultParseError(
1307+
e.into(),
1308+
))
1309+
})?;
12981310

12991311
Ok(QueryResponse {
13001312
response,
@@ -1727,7 +1739,7 @@ impl Connection {
17271739

17281740
let event = match Self::parse_response(task_response, compression, &features, None) {
17291741
Ok(r) => match r.response {
1730-
Response::Event(event) => event,
1742+
ResponseWithDeserializedMetadata::Event(event) => event,
17311743
_ => {
17321744
error!("Expected to receive Event response, got {:?}", r.response);
17331745
return Err(CqlEventHandlingError::UnexpectedResponse(

scylla/src/observability/driver_tracing.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ use crate::response::query_result::QueryResult;
44
use crate::routing::{Shard, Token};
55
use crate::utils::safe_format::IteratorSafeFormatExt;
66
use itertools::Either;
7-
use scylla_cql::frame::response::result::ColumnSpec;
8-
use scylla_cql::frame::response::result::RawMetadataAndRawRows;
7+
use scylla_cql::frame::response::result::{ColumnSpec, DeserializedMetadataAndRawRows};
98
use scylla_cql::value::deser_cql_value;
109
use std::borrow::Borrow;
1110
use std::fmt::Display;
@@ -108,14 +107,14 @@ impl RequestSpan {
108107
}
109108
}
110109

111-
pub(crate) fn record_raw_rows_fields(&self, raw_rows: &RawMetadataAndRawRows) {
110+
pub(crate) fn record_raw_rows_fields(&self, raw_rows: &DeserializedMetadataAndRawRows) {
112111
self.span
113112
.record("raw_result_size", raw_rows.metadata_and_rows_bytes_size());
114113
}
115114

116115
pub(crate) fn record_result_fields(&self, query_result: &QueryResult) {
117-
if let Some(raw_metadata_and_rows) = query_result.raw_metadata_and_rows() {
118-
self.record_raw_rows_fields(raw_metadata_and_rows);
116+
if let Some(raw_rows) = query_result.deserialized_metadata_and_rows() {
117+
self.record_raw_rows_fields(raw_rows);
119118
}
120119
}
121120

0 commit comments

Comments
 (0)