@@ -7,10 +7,16 @@ use scylla::routing::Token;
77use scylla:: serialize:: row:: SerializeRow ;
88use scylla:: statement:: prepared:: PreparedStatement ;
99use scylla:: statement:: Statement ;
10+ use scylla_cql:: frame:: types;
11+ use scylla_proxy:: {
12+ Condition , ProxyError , Reaction , ResponseFrame , ResponseOpcode , ResponseReaction , ResponseRule ,
13+ ShardAwareness , TargetShard , WorkerError ,
14+ } ;
15+ use std:: sync:: Arc ;
1016
1117use crate :: utils:: {
12- create_new_session_builder, scylla_supports_tablets, setup_tracing, unique_keyspace_name ,
13- PerformDDL as _,
18+ create_new_session_builder, scylla_supports_tablets, setup_tracing, test_with_3_node_cluster ,
19+ unique_keyspace_name , PerformDDL as _,
1420} ;
1521
1622#[ tokio:: test]
@@ -469,3 +475,158 @@ async fn test_prepared_statement_col_specs() {
469475 ] ;
470476 assert_eq ! ( result_set_col_specs, expected_result_set_col_specs) ;
471477}
478+
479+ #[ tokio:: test]
480+ #[ ntest:: timeout( 20000 ) ]
481+ #[ cfg_attr( scylla_cloud_tests, ignore) ]
482+ async fn test_skip_result_metadata ( ) {
483+ use scylla:: client:: session:: Session ;
484+ use scylla:: client:: session_builder:: SessionBuilder ;
485+
486+ setup_tracing ( ) ;
487+
488+ const NO_METADATA_FLAG : i32 = 0x0004 ;
489+
490+ let res = test_with_3_node_cluster ( ShardAwareness :: QueryNode , |proxy_uris, translation_map, mut running_proxy| async move {
491+ // DB preparation phase
492+ let session: Session = SessionBuilder :: new ( )
493+ . known_node ( proxy_uris[ 0 ] . as_str ( ) )
494+ . address_translator ( Arc :: new ( translation_map) )
495+ . build ( )
496+ . await
497+ . unwrap ( ) ;
498+
499+ let ks = unique_keyspace_name ( ) ;
500+ session. ddl ( format ! ( "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}" , ks) ) . await . unwrap ( ) ;
501+ session. use_keyspace ( ks, false ) . await . unwrap ( ) ;
502+ session
503+ . ddl ( "CREATE TABLE t (a int primary key, b int, c text)" )
504+ . await
505+ . unwrap ( ) ;
506+ session. query_unpaged ( "INSERT INTO t (a, b, c) VALUES (1, 2, 'foo_filter_data')" , & [ ] ) . await . unwrap ( ) ;
507+
508+ let mut prepared = session. prepare ( "SELECT a, b, c FROM t" ) . await . unwrap ( ) ;
509+
510+ let ( tx, mut rx) = tokio:: sync:: mpsc:: unbounded_channel ( ) ;
511+
512+ // We inserted this string to filter responses
513+ let body_rows = b"foo_filter_data" ;
514+ for node in running_proxy. running_nodes . iter_mut ( ) {
515+ let rule = ResponseRule (
516+ Condition :: ResponseOpcode ( ResponseOpcode :: Result ) . and ( Condition :: BodyContainsCaseSensitive ( Box :: new ( * body_rows) ) ) ,
517+ ResponseReaction :: noop ( ) . with_feedback_when_performed ( tx. clone ( ) )
518+ ) ;
519+ node. change_response_rules ( Some ( vec ! [ rule] ) ) ;
520+ }
521+
522+ async fn test_with_flags_predicate (
523+ session : & Session ,
524+ prepared : & PreparedStatement ,
525+ rx : & mut tokio:: sync:: mpsc:: UnboundedReceiver < ( ResponseFrame , Option < TargetShard > ) > ,
526+ predicate : impl FnOnce ( i32 ) -> bool
527+ ) {
528+ session. execute_unpaged ( prepared, & [ ] ) . await . unwrap ( ) ;
529+
530+ let ( frame, _shard) = rx. recv ( ) . await . unwrap ( ) ;
531+ let mut buf = & * frame. body ;
532+
533+ // FIXME: make use of scylla_cql::frame utilities, instead of deserializing frame manually.
534+ // This will probably be possible once https://github.com/scylladb/scylla-rust-driver/issues/462 is fixed.
535+ match types:: read_int ( & mut buf) . unwrap ( ) {
536+ 0x0002 => ( ) ,
537+ _ => panic ! ( "Invalid result type" ) ,
538+ }
539+ let result_metadata_flags = types:: read_int ( & mut buf) . unwrap ( ) ;
540+ assert ! ( predicate( result_metadata_flags) ) ;
541+ }
542+
543+ // Verify that server sends metadata when driver doesn't send SKIP_METADATA flag.
544+ prepared. set_use_cached_result_metadata ( false ) ;
545+ test_with_flags_predicate ( & session, & prepared, & mut rx, |flags| flags & NO_METADATA_FLAG == 0 ) . await ;
546+
547+ // Verify that server doesn't send metadata when driver sends SKIP_METADATA flag.
548+ prepared. set_use_cached_result_metadata ( true ) ;
549+ test_with_flags_predicate ( & session, & prepared, & mut rx, |flags| flags & NO_METADATA_FLAG != 0 ) . await ;
550+
551+ // Verify that the optimisation does not break paging
552+ {
553+ let ks = unique_keyspace_name ( ) ;
554+
555+ session. ddl ( format ! ( "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}" , ks) ) . await . unwrap ( ) ;
556+ session. use_keyspace ( ks, true ) . await . unwrap ( ) ;
557+
558+ type RowT = ( i32 , i32 , String ) ;
559+ session
560+ . ddl (
561+ "CREATE TABLE IF NOT EXISTS t2 (a int, b int, c text, primary key (a, b))" ,
562+ )
563+ . await
564+ . unwrap ( ) ;
565+
566+ let insert_stmt = session
567+ . prepare ( "INSERT INTO t2 (a, b, c) VALUES (?, ?, ?)" )
568+ . await
569+ . unwrap ( ) ;
570+
571+ for idx in 0 ..10 {
572+ session
573+ . execute_unpaged ( & insert_stmt, ( idx, idx + 1 , "Some text" ) )
574+ . await
575+ . unwrap ( ) ;
576+ }
577+
578+ {
579+ let select_query = "SELECT a, b, c FROM t2" ;
580+
581+ let rs = session
582+ . query_unpaged ( select_query, ( ) )
583+ . await
584+ . unwrap ( )
585+ . into_rows_result ( )
586+ . unwrap ( )
587+ . rows :: < RowT > ( )
588+ . unwrap ( )
589+ . collect :: < Result < Vec < _ > , _ > > ( )
590+ . unwrap ( ) ;
591+
592+ let mut results_from_manual_paging: Vec < RowT > = vec ! [ ] ;
593+ let mut prepared_paged = session. prepare ( select_query) . await . unwrap ( ) ;
594+ prepared_paged. set_use_cached_result_metadata ( true ) ;
595+ prepared_paged. set_page_size ( 1 ) ;
596+ let mut paging_state = PagingState :: start ( ) ;
597+ let mut watchdog = 0 ;
598+ loop {
599+ let ( rs_manual, paging_state_response) = session
600+ . execute_single_page ( & prepared_paged, & [ ] , paging_state)
601+ . await
602+ . unwrap ( ) ;
603+ results_from_manual_paging. extend (
604+ rs_manual. into_rows_result ( )
605+ . unwrap ( )
606+ . rows :: < RowT > ( )
607+ . unwrap ( )
608+ . map ( Result :: unwrap)
609+ ) ;
610+
611+ match paging_state_response {
612+ PagingStateResponse :: HasMorePages { state } => {
613+ paging_state = state;
614+ }
615+ _ if watchdog > 30 => break ,
616+ PagingStateResponse :: NoMorePages => break ,
617+ }
618+ watchdog += 1 ;
619+ }
620+ assert_eq ! ( results_from_manual_paging, rs) ;
621+ }
622+ }
623+
624+ running_proxy
625+ } ) . await ;
626+
627+ match res {
628+ Ok ( ( ) ) => ( ) ,
629+ Err ( ProxyError :: Worker ( WorkerError :: DriverDisconnected ( _) ) ) => ( ) ,
630+ Err ( err) => panic ! ( "{}" , err) ,
631+ }
632+ }
0 commit comments