1+ use byteorder:: { BigEndian , ReadBytesExt } ;
12use bytes:: { BufMut , Bytes , BytesMut } ;
23use smallvec:: { smallvec, SmallVec } ;
34use std:: convert:: TryInto ;
@@ -6,6 +7,9 @@ use std::time::Duration;
67use thiserror:: Error ;
78use uuid:: Uuid ;
89
10+ use scylla_cql:: frame:: frame_errors:: ParseError ;
11+ use scylla_cql:: frame:: response:: result:: { deser_cql_value, CqlValue } ;
12+
913use super :: StatementConfig ;
1014use crate :: frame:: response:: result:: PreparedMetadata ;
1115use crate :: frame:: types:: { Consistency , SerialConsistency } ;
@@ -337,3 +341,191 @@ pub enum PartitionKeyError {
337341 #[ error( "Value bytes too long to create partition key, max 65 535 allowed! value.len(): {0}" ) ]
338342 ValueTooLong ( usize ) ,
339343}
344+
345+ // The PartitionKeyDecoder reverses the process of PreparedStatement::compute_partition_key:
346+ // it returns the consecutive values of partition key column that were encoded
347+ // by the function into the Bytes object, additionally decoding them to CqlValue.
348+ //
349+ // The format follows the description here:
350+ // <https://github.com/scylladb/scylla/blob/40adf38915b6d8f5314c621a94d694d172360833/compound_compat.hh#L33-L47>
351+ //
352+ // TODO: Currently, if there is a null value specified for a partition column,
353+ // it will be skipped when creating the serialized partition key. We should
354+ // not create such partition keys in the future, i.e. fail the request or
355+ // route it to a random replica instead and let the DB reject it. In the
356+ // meantime, this struct will return some garbage data while it tries to
357+ // decode the key, but nothing bad like a panic should happen otherwise.
358+ // The struct is currently only used for printing the partition key, so that's
359+ // completely fine.
360+ #[ derive( Clone , Copy ) ]
361+ pub ( crate ) struct PartitionKeyDecoder < ' pk > {
362+ prepared_metadata : & ' pk PreparedMetadata ,
363+ partition_key : & ' pk [ u8 ] ,
364+ value_index : usize ,
365+ }
366+
367+ impl < ' pk > PartitionKeyDecoder < ' pk > {
368+ pub ( crate ) fn new ( prepared_metadata : & ' pk PreparedMetadata , partition_key : & ' pk [ u8 ] ) -> Self {
369+ Self {
370+ prepared_metadata,
371+ partition_key,
372+ value_index : 0 ,
373+ }
374+ }
375+ }
376+
377+ impl < ' pk > Iterator for PartitionKeyDecoder < ' pk > {
378+ type Item = Result < CqlValue , ParseError > ;
379+
380+ fn next ( & mut self ) -> Option < Self :: Item > {
381+ if self . value_index >= self . prepared_metadata . pk_indexes . len ( ) {
382+ return None ;
383+ }
384+
385+ let col_idx = self . prepared_metadata . pk_indexes [ self . value_index ] . index as usize ;
386+ let spec = & self . prepared_metadata . col_specs [ col_idx] ;
387+
388+ let cell = if self . prepared_metadata . pk_indexes . len ( ) == 1 {
389+ Ok ( self . partition_key )
390+ } else {
391+ self . parse_cell ( )
392+ } ;
393+ self . value_index += 1 ;
394+ Some ( cell. and_then ( |mut cell| deser_cql_value ( & spec. typ , & mut cell) ) )
395+ }
396+ }
397+
398+ impl < ' pk > PartitionKeyDecoder < ' pk > {
399+ fn parse_cell ( & mut self ) -> Result < & ' pk [ u8 ] , ParseError > {
400+ let buf = & mut self . partition_key ;
401+ let len = buf. read_u16 :: < BigEndian > ( ) ? as usize ;
402+ if buf. len ( ) < len {
403+ return Err ( ParseError :: IoError ( std:: io:: Error :: new (
404+ std:: io:: ErrorKind :: UnexpectedEof ,
405+ "value too short" ,
406+ ) ) ) ;
407+ }
408+ let col = & buf[ ..len] ;
409+ * buf = & buf[ len..] ;
410+ buf. read_u8 ( ) ?;
411+ Ok ( col)
412+ }
413+ }
414+
415+ #[ cfg( test) ]
416+ mod tests {
417+ use bytes:: { BufMut , Bytes , BytesMut } ;
418+ use scylla_cql:: frame:: response:: result:: {
419+ ColumnSpec , ColumnType , CqlValue , PartitionKeyIndex , PreparedMetadata , TableSpec ,
420+ } ;
421+
422+ use super :: PartitionKeyDecoder ;
423+
424+ fn make_meta (
425+ cols : impl IntoIterator < Item = ColumnType > ,
426+ idx : impl IntoIterator < Item = usize > ,
427+ ) -> PreparedMetadata {
428+ let table_spec = TableSpec {
429+ ks_name : "ks" . to_owned ( ) ,
430+ table_name : "t" . to_owned ( ) ,
431+ } ;
432+ let col_specs: Vec < _ > = cols
433+ . into_iter ( )
434+ . enumerate ( )
435+ . map ( |( i, typ) | ColumnSpec {
436+ name : format ! ( "col_{}" , i) ,
437+ table_spec : table_spec. clone ( ) ,
438+ typ,
439+ } )
440+ . collect ( ) ;
441+ let pk_indexes = idx
442+ . into_iter ( )
443+ . enumerate ( )
444+ . map ( |( sequence, index) | PartitionKeyIndex {
445+ index : index as u16 ,
446+ sequence : sequence as u16 ,
447+ } )
448+ . collect ( ) ;
449+ PreparedMetadata {
450+ flags : 0 ,
451+ col_count : col_specs. len ( ) ,
452+ col_specs,
453+ pk_indexes,
454+ }
455+ }
456+
457+ fn make_key < ' pk > ( cols : impl IntoIterator < Item = & ' pk [ u8 ] > ) -> Bytes {
458+ let cols: Vec < _ > = cols. into_iter ( ) . collect ( ) ;
459+ // TODO: Use compute_partition_key or one of the variants
460+ // after they are moved to a more sensible place
461+ // instead of constructing the PK manually
462+ let mut b = BytesMut :: new ( ) ;
463+ if cols. len ( ) == 1 {
464+ b. extend_from_slice ( cols[ 0 ] ) ;
465+ } else {
466+ for c in cols {
467+ b. put_i16 ( c. len ( ) as i16 ) ;
468+ b. extend_from_slice ( c) ;
469+ b. put_u8 ( 0 ) ;
470+ }
471+ }
472+ b. freeze ( )
473+ }
474+
475+ #[ test]
476+ fn test_pk_decoder_single_column ( ) {
477+ let meta = make_meta ( [ ColumnType :: Int ] , [ 0 ] ) ;
478+ let pk = make_key ( [ 0i32 . to_be_bytes ( ) . as_ref ( ) ] ) ;
479+ let cells = PartitionKeyDecoder :: new ( & meta, & pk)
480+ . collect :: < Result < Vec < _ > , _ > > ( )
481+ . unwrap ( ) ;
482+ assert_eq ! ( cells, vec![ CqlValue :: Int ( 0 ) ] ) ;
483+ }
484+
485+ #[ test]
486+ fn test_pk_decoder_multiple_columns ( ) {
487+ let meta = make_meta ( std:: iter:: repeat ( ColumnType :: Int ) . take ( 3 ) , [ 0 , 1 , 2 ] ) ;
488+ let pk = make_key ( [
489+ 12i32 . to_be_bytes ( ) . as_ref ( ) ,
490+ 34i32 . to_be_bytes ( ) . as_ref ( ) ,
491+ 56i32 . to_be_bytes ( ) . as_ref ( ) ,
492+ ] ) ;
493+ let cells = PartitionKeyDecoder :: new ( & meta, & pk)
494+ . collect :: < Result < Vec < _ > , _ > > ( )
495+ . unwrap ( ) ;
496+ assert_eq ! (
497+ cells,
498+ vec![ CqlValue :: Int ( 12 ) , CqlValue :: Int ( 34 ) , CqlValue :: Int ( 56 ) ] ,
499+ ) ;
500+ }
501+
502+ #[ test]
503+ fn test_pk_decoder_multiple_columns_shuffled ( ) {
504+ let meta = make_meta (
505+ [
506+ ColumnType :: TinyInt ,
507+ ColumnType :: SmallInt ,
508+ ColumnType :: Int ,
509+ ColumnType :: BigInt ,
510+ ColumnType :: Blob ,
511+ ] ,
512+ [ 4 , 0 , 3 ] ,
513+ ) ;
514+ let pk = make_key ( [
515+ & [ 1 , 2 , 3 , 4 , 5 ] ,
516+ 67i8 . to_be_bytes ( ) . as_ref ( ) ,
517+ 89i64 . to_be_bytes ( ) . as_ref ( ) ,
518+ ] ) ;
519+ let cells = PartitionKeyDecoder :: new ( & meta, & pk)
520+ . collect :: < Result < Vec < _ > , _ > > ( )
521+ . unwrap ( ) ;
522+ assert_eq ! (
523+ cells,
524+ vec![
525+ CqlValue :: Blob ( vec![ 1 , 2 , 3 , 4 , 5 ] ) ,
526+ CqlValue :: TinyInt ( 67 ) ,
527+ CqlValue :: BigInt ( 89 ) ,
528+ ] ,
529+ ) ;
530+ }
531+ }
0 commit comments