Skip to content

Commit 00f39a4

Browse files
authored
Merge pull request #1120 from wprzytula/migrate-tablets-to-new-deserialization-framework
Migrate tablets to new deserialization framework
2 parents e99d697 + 453602c commit 00f39a4

File tree

4 files changed

+50
-29
lines changed

4 files changed

+50
-29
lines changed

scylla-cql/src/frame/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ pub struct ResponseBodyWithExtensions {
184184
pub trace_id: Option<Uuid>,
185185
pub warnings: Vec<String>,
186186
pub body: Bytes,
187-
pub custom_payload: Option<HashMap<String, Vec<u8>>>,
187+
pub custom_payload: Option<HashMap<String, Bytes>>,
188188
}
189189

190190
pub fn parse_response_body_extensions(

scylla-cql/src/frame/types.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
use super::frame_errors::LowLevelDeserializationError;
44
use super::TryFromPrimitiveError;
55
use byteorder::{BigEndian, ReadBytesExt};
6+
use bytes::Bytes;
7+
#[cfg(test)]
8+
use bytes::BytesMut;
69
use bytes::{Buf, BufMut};
710
use std::collections::HashMap;
811
use std::convert::TryFrom;
@@ -314,12 +317,12 @@ pub fn write_short_bytes(v: &[u8], buf: &mut impl BufMut) -> Result<(), std::num
314317

315318
pub fn read_bytes_map(
316319
buf: &mut &[u8],
317-
) -> Result<HashMap<String, Vec<u8>>, LowLevelDeserializationError> {
320+
) -> Result<HashMap<String, Bytes>, LowLevelDeserializationError> {
318321
let len = read_short_length(buf)?;
319322
let mut v = HashMap::with_capacity(len);
320323
for _ in 0..len {
321324
let key = read_string(buf)?.to_owned();
322-
let val = read_bytes(buf)?.to_owned();
325+
let val = Bytes::copy_from_slice(read_bytes(buf)?);
323326
v.insert(key, val);
324327
}
325328
Ok(v)
@@ -344,10 +347,10 @@ where
344347
#[test]
345348
fn type_bytes_map() {
346349
let mut val = HashMap::new();
347-
val.insert("".to_owned(), vec![]);
348-
val.insert("EXTENSION1".to_owned(), vec![1, 2, 3]);
349-
val.insert("EXTENSION2".to_owned(), vec![4, 5, 6]);
350-
let mut buf = Vec::new();
350+
val.insert("".to_owned(), Bytes::new());
351+
val.insert("EXTENSION1".to_owned(), Bytes::from_static(&[1, 2, 3]));
352+
val.insert("EXTENSION2".to_owned(), Bytes::from_static(&[4, 5, 6]));
353+
let mut buf = BytesMut::new();
351354
write_bytes_map(&val, &mut buf).unwrap();
352355
assert_eq!(read_bytes_map(&mut &*buf).unwrap(), val);
353356
}

scylla/src/transport/connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ pub(crate) struct QueryResponse {
217217
pub(crate) tracing_id: Option<Uuid>,
218218
pub(crate) warnings: Vec<String>,
219219
#[allow(dead_code)] // This is not exposed to user (yet?)
220-
pub(crate) custom_payload: Option<HashMap<String, Vec<u8>>>,
220+
pub(crate) custom_payload: Option<HashMap<String, Bytes>>,
221221
}
222222

223223
// A QueryResponse in which response can not be Response::Error

scylla/src/transport/locator/tablets.rs

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1+
use bytes::Bytes;
12
use itertools::Itertools;
23
use lazy_static::lazy_static;
3-
use scylla_cql::cql_to_rust::FromCqlVal;
4-
use scylla_cql::frame::response::result::{deser_cql_value, ColumnType, TableSpec};
5-
use scylla_cql::types::deserialize::DeserializationError;
4+
use scylla_cql::frame::response::result::{ColumnType, TableSpec};
5+
use scylla_cql::types::deserialize::value::ListlikeIterator;
6+
use scylla_cql::types::deserialize::{
7+
DeserializationError, DeserializeValue, FrameSlice, TypeCheckError,
8+
};
69
use thiserror::Error;
710
use tracing::warn;
811
use uuid::Uuid;
@@ -11,12 +14,15 @@ use crate::routing::{Shard, Token};
1114
use crate::transport::Node;
1215

1316
use std::collections::{HashMap, HashSet};
17+
use std::ops::Deref;
1418
use std::sync::Arc;
1519

1620
#[derive(Error, Debug)]
1721
pub(crate) enum TabletParsingError {
1822
#[error(transparent)]
1923
Deserialization(#[from] DeserializationError),
24+
#[error(transparent)]
25+
TypeCheck(#[from] TypeCheckError),
2026
#[error("Shard id for tablet is negative: {0}")]
2127
ShardNum(i32),
2228
}
@@ -35,7 +41,8 @@ pub(crate) struct RawTablet {
3541
replicas: RawTabletReplicas,
3642
}
3743

38-
type RawTabletPayload = (i64, i64, Vec<(Uuid, i32)>);
44+
type RawTabletPayload<'frame, 'metadata> =
45+
(i64, i64, ListlikeIterator<'frame, 'metadata, (Uuid, i32)>);
3946

4047
lazy_static! {
4148
static ref RAW_TABLETS_CQL_TYPE: ColumnType<'static> = ColumnType::Tuple(vec![
@@ -52,29 +59,37 @@ const CUSTOM_PAYLOAD_TABLETS_V1_KEY: &str = "tablets-routing-v1";
5259

5360
impl RawTablet {
5461
pub(crate) fn from_custom_payload(
55-
payload: &HashMap<String, Vec<u8>>,
62+
payload: &HashMap<String, Bytes>,
5663
) -> Option<Result<RawTablet, TabletParsingError>> {
5764
let payload = payload.get(CUSTOM_PAYLOAD_TABLETS_V1_KEY)?;
58-
let cql_value = match deser_cql_value(&RAW_TABLETS_CQL_TYPE, &mut payload.as_slice()) {
59-
Ok(r) => r,
60-
Err(e) => return Some(Err(e.into())),
65+
66+
if let Err(err) =
67+
<RawTabletPayload as DeserializeValue<'_, '_>>::type_check(RAW_TABLETS_CQL_TYPE.deref())
68+
{
69+
return Some(Err(err.into()));
6170
};
6271

63-
// This could only fail if the type was wrong, but we do pass correct type
64-
// to `deser_cql_value`.
6572
let (first_token, last_token, replicas): RawTabletPayload =
66-
FromCqlVal::from_cql(cql_value).unwrap();
73+
match <RawTabletPayload as DeserializeValue<'_, '_>>::deserialize(
74+
RAW_TABLETS_CQL_TYPE.deref(),
75+
Some(FrameSlice::new(payload)),
76+
) {
77+
Ok(tuple) => tuple,
78+
Err(err) => return Some(Err(err.into())),
79+
};
6780

6881
let replicas = match replicas
69-
.into_iter()
70-
.map(|(uuid, shard_num)| match shard_num.try_into() {
71-
Ok(s) => Ok((uuid, s)),
72-
Err(_) => Err(shard_num),
82+
.map(|res| {
83+
res.map_err(TabletParsingError::from)
84+
.and_then(|(uuid, shard_num)| match shard_num.try_into() {
85+
Ok(s) => Ok((uuid, s)),
86+
Err(_) => Err(TabletParsingError::ShardNum(shard_num)),
87+
})
7388
})
74-
.collect::<Result<Vec<(Uuid, Shard)>, _>>()
89+
.collect::<Result<Vec<(Uuid, Shard)>, TabletParsingError>>()
7590
{
7691
Ok(r) => r,
77-
Err(shard_num) => return Some(Err(TabletParsingError::ShardNum(shard_num))),
92+
Err(err) => return Some(Err(err)),
7893
};
7994

8095
Some(Ok(RawTablet {
@@ -590,6 +605,7 @@ mod tests {
590605
use std::collections::{HashMap, HashSet};
591606
use std::sync::Arc;
592607

608+
use bytes::Bytes;
593609
use scylla_cql::frame::response::result::{ColumnType, CqlValue, TableSpec};
594610
use scylla_cql::types::serialize::value::SerializeValue;
595611
use scylla_cql::types::serialize::CellWriter;
@@ -618,8 +634,10 @@ mod tests {
618634

619635
#[test]
620636
fn test_raw_tablet_deser_trash() {
621-
let custom_payload =
622-
HashMap::from([(CUSTOM_PAYLOAD_TABLETS_V1_KEY.to_string(), vec![1, 2, 3])]);
637+
let custom_payload = HashMap::from([(
638+
CUSTOM_PAYLOAD_TABLETS_V1_KEY.to_string(),
639+
Bytes::from_static(&[1, 2, 3]),
640+
)]);
623641
assert_matches::assert_matches!(
624642
RawTablet::from_custom_payload(&custom_payload),
625643
Some(Err(TabletParsingError::Deserialization(_)))
@@ -648,7 +666,7 @@ mod tests {
648666
SerializeValue::serialize(&value, &col_type, CellWriter::new(&mut data)).unwrap();
649667
debug!("{:?}", data);
650668

651-
custom_payload.insert(CUSTOM_PAYLOAD_TABLETS_V1_KEY.to_string(), data);
669+
custom_payload.insert(CUSTOM_PAYLOAD_TABLETS_V1_KEY.to_string(), Bytes::from(data));
652670

653671
assert_matches::assert_matches!(
654672
RawTablet::from_custom_payload(&custom_payload),
@@ -688,7 +706,7 @@ mod tests {
688706
// Skipping length because `SerializeValue::serialize` adds length at the
689707
// start of serialized value while Scylla sends the value without initial
690708
// length.
691-
data[4..].to_vec(),
709+
Bytes::copy_from_slice(&data[4..]),
692710
);
693711

694712
let tablet = RawTablet::from_custom_payload(&custom_payload)

0 commit comments

Comments
 (0)