@@ -36,6 +36,7 @@ use crate::routing::{Shard, ShardAwarePortRange};
3636use crate :: statement:: batch:: BoundBatch ;
3737use crate :: statement:: batch:: { Batch , BatchStatement } ;
3838use crate :: statement:: bound:: BoundStatement ;
39+ use crate :: statement:: execute:: { Execute , ExecutePageable } ;
3940use crate :: statement:: prepared:: { PartitionKeyError , PreparedStatement } ;
4041use crate :: statement:: unprepared:: Statement ;
4142use crate :: statement:: { Consistency , PageSize , StatementConfig } ;
@@ -56,7 +57,7 @@ use std::time::Duration;
5657use tokio:: time:: timeout;
5758#[ cfg( feature = "unstable-cloud" ) ]
5859use tracing:: warn;
59- use tracing:: { debug, error , trace, trace_span, Instrument } ;
60+ use tracing:: { debug, trace, trace_span, Instrument } ;
6061use uuid:: Uuid ;
6162
6263pub ( crate ) const TABLET_CHANNEL_SIZE : usize = 8192 ;
@@ -481,7 +482,8 @@ impl Session {
481482 statement : impl Into < Statement > ,
482483 values : impl SerializeRow ,
483484 ) -> Result < QueryResult , ExecutionError > {
484- self . do_query_unpaged ( & statement. into ( ) , values) . await
485+ let statement = statement. into ( ) ;
486+ ( & statement, values) . execute ( self ) . await
485487 }
486488
487489 /// Queries a single page from the database, optionally continuing from a saved point.
@@ -541,7 +543,9 @@ impl Session {
541543 values : impl SerializeRow ,
542544 paging_state : PagingState ,
543545 ) -> Result < ( QueryResult , PagingStateResponse ) , ExecutionError > {
544- self . do_query_single_page ( & statement. into ( ) , values, paging_state)
546+ let statement = statement. into ( ) ;
547+ ( & statement, values)
548+ . execute_pageable :: < true > ( self , paging_state)
545549 . await
546550 }
547551
@@ -806,9 +810,9 @@ impl Session {
806810 values : impl BatchValues ,
807811 ) -> Result < QueryResult , ExecutionError > {
808812 let batch = self . last_minute_prepare_batch ( batch, & values) . await ?;
809- let batch = BoundBatch :: from_batch ( batch. as_ref ( ) , values) ?;
810-
811- self . do_batch ( & batch ) . await
813+ BoundBatch :: from_batch ( batch. as_ref ( ) , values) ?
814+ . execute ( self )
815+ . await
812816 }
813817}
814818
@@ -989,38 +993,6 @@ impl Session {
989993 Ok ( session)
990994 }
991995
992- async fn do_query_unpaged (
993- & self ,
994- statement : & Statement ,
995- values : impl SerializeRow ,
996- ) -> Result < QueryResult , ExecutionError > {
997- let ( result, paging_state_response) = self
998- . query ( statement, values, None , PagingState :: start ( ) )
999- . await ?;
1000- if !paging_state_response. finished ( ) {
1001- error ! ( "Unpaged unprepared query returned a non-empty paging state! This is a driver-side or server-side bug." ) ;
1002- return Err ( ExecutionError :: LastAttemptError (
1003- RequestAttemptError :: NonfinishedPagingState ,
1004- ) ) ;
1005- }
1006- Ok ( result)
1007- }
1008-
1009- async fn do_query_single_page (
1010- & self ,
1011- statement : & Statement ,
1012- values : impl SerializeRow ,
1013- paging_state : PagingState ,
1014- ) -> Result < ( QueryResult , PagingStateResponse ) , ExecutionError > {
1015- self . query (
1016- statement,
1017- values,
1018- Some ( statement. get_validated_page_size ( ) ) ,
1019- paging_state,
1020- )
1021- . await
1022- }
1023-
1024996 /// Sends a request to the database.
1025997 /// Optionally continues fetching results from a saved point.
1026998 ///
@@ -1032,7 +1004,7 @@ impl Session {
10321004 /// that we need to require users to make a conscious decision to use paging or not. For that, we expose
10331005 /// the aforementioned 3 methods clearly differing in naming and API, so that no unconscious choices about paging
10341006 /// should be made.
1035- async fn query (
1007+ pub ( crate ) async fn query (
10361008 & self ,
10371009 statement : & Statement ,
10381010 values : impl SerializeRow ,
@@ -1299,23 +1271,15 @@ impl Session {
12991271 & self ,
13001272 statement : & BoundStatement ,
13011273 ) -> Result < QueryResult , ExecutionError > {
1302- let ( result, paging_state) = self . execute ( statement, None , PagingState :: start ( ) ) . await ?;
1303- if !paging_state. finished ( ) {
1304- error ! ( "Unpaged prepared query returned a non-empty paging state! This is a driver-side or server-side bug." ) ;
1305- return Err ( ExecutionError :: LastAttemptError (
1306- RequestAttemptError :: NonfinishedPagingState ,
1307- ) ) ;
1308- }
1309- Ok ( result)
1274+ statement. execute ( self ) . await
13101275 }
13111276
13121277 async fn do_execute_single_page (
13131278 & self ,
13141279 statement : & BoundStatement ,
13151280 paging_state : PagingState ,
13161281 ) -> Result < ( QueryResult , PagingStateResponse ) , ExecutionError > {
1317- let page_size = statement. prepared . get_validated_page_size ( ) ;
1318- self . execute ( statement, Some ( page_size) , paging_state) . await
1282+ statement. execute_pageable :: < true > ( self , paging_state) . await
13191283 }
13201284
13211285 /// Sends a prepared request to the database, optionally continuing from a saved point.
@@ -1328,7 +1292,7 @@ impl Session {
13281292 /// that we need to require users to make a conscious decision to use paging or not. For that, we expose
13291293 /// the aforementioned 3 methods clearly differing in naming and API, so that no unconscious choices about paging
13301294 /// should be made.
1331- async fn execute (
1295+ pub ( crate ) async fn execute_bound_statement (
13321296 & self ,
13331297 statement : & BoundStatement ,
13341298 page_size : Option < PageSize > ,
@@ -1449,7 +1413,7 @@ impl Session {
14491413 . map_err ( PagerExecutionError :: NextPageError )
14501414 }
14511415
1452- async fn do_batch ( & self , batch : & BoundBatch ) -> Result < QueryResult , ExecutionError > {
1416+ pub ( crate ) async fn do_batch ( & self , batch : & BoundBatch ) -> Result < QueryResult , ExecutionError > {
14531417 // Shard-awareness behavior for batch will be to pick shard based on first batch statement's shard
14541418 // If users batch statements by shard, they will be rewarded with full shard awareness
14551419 let execution_profile = batch
@@ -1750,10 +1714,10 @@ impl Session {
17501714 traces_events_query. config . consistency = consistency;
17511715 traces_events_query. set_page_size ( TRACING_QUERY_PAGE_SIZE ) ;
17521716
1753- let ( traces_session_res , traces_events_res ) = tokio :: try_join! (
1754- self . do_query_unpaged ( & traces_session_query , ( tracing_id, ) ) ,
1755- self . do_query_unpaged ( & traces_events_query , ( tracing_id , ) )
1756- ) ?;
1717+ let session_query = ( & traces_session_query , ( tracing_id , ) ) ;
1718+ let events_query = ( & traces_events_query , ( tracing_id, ) ) ;
1719+ let ( traces_session_res , traces_events_res ) =
1720+ tokio :: try_join! ( session_query . execute ( self ) , events_query . execute ( self ) ) ?;
17571721
17581722 // Get tracing info
17591723 let maybe_tracing_info: Option < TracingInfo > = traces_session_res
0 commit comments