@@ -15,7 +15,6 @@ use crate::errors::{
1515 ExecutionError , MetadataError , NewSessionError , PagerExecutionError , PrepareError ,
1616 RequestAttemptError , RequestError , SchemaAgreementError , TracingError , UseKeyspaceError ,
1717} ;
18- use crate :: frame:: response:: result;
1918use crate :: network:: tls:: TlsProvider ;
2019use crate :: network:: { Connection , ConnectionConfig , PoolConfig , VerifiedKeyspaceName } ;
2120use crate :: observability:: driver_tracing:: RequestSpan ;
@@ -30,21 +29,20 @@ use crate::policies::retry::{RequestInfo, RetryDecision, RetrySession};
3029use crate :: policies:: speculative_execution;
3130use crate :: policies:: timestamp_generator:: TimestampGenerator ;
3231use crate :: response:: query_result:: { MaybeFirstRowError , QueryResult , RowsError } ;
33- use crate :: response:: { NonErrorQueryResponse , PagingState , PagingStateResponse , QueryResponse } ;
32+ use crate :: response:: { NonErrorQueryResponse , PagingState , PagingStateResponse } ;
3433use crate :: routing:: partitioner:: PartitionerName ;
3534use crate :: routing:: { Shard , ShardAwarePortRange } ;
3635use crate :: statement:: batch:: BoundBatch ;
3736use crate :: statement:: batch:: { Batch , BatchStatement } ;
3837use crate :: statement:: bound:: BoundStatement ;
3938use crate :: statement:: execute:: { Execute , ExecutePageable } ;
40- use crate :: statement:: prepared:: { PartitionKeyError , PreparedStatement } ;
39+ use crate :: statement:: prepared:: PreparedStatement ;
4140use crate :: statement:: unprepared:: Statement ;
42- use crate :: statement:: { Consistency , PageSize , StatementConfig } ;
41+ use crate :: statement:: { Consistency , StatementConfig } ;
4342use arc_swap:: ArcSwapOption ;
4443use futures:: future:: join_all;
4544use futures:: future:: try_join_all;
4645use itertools:: Itertools ;
47- use scylla_cql:: frame:: response:: NonErrorResponse ;
4846use scylla_cql:: serialize:: batch:: { BatchValues , BatchValuesIterator } ;
4947use scylla_cql:: serialize:: row:: SerializeRow ;
5048use std:: borrow:: { Borrow , Cow } ;
@@ -993,113 +991,7 @@ impl Session {
993991 Ok ( session)
994992 }
995993
996- /// Sends a request to the database.
997- /// Optionally continues fetching results from a saved point.
998- ///
999- /// This is now an internal method only.
1000- ///
1001- /// Tl;dr: use [Session::query_unpaged], [Session::query_single_page] or [Session::query_iter] instead.
1002- ///
1003- /// The rationale is that we believe that paging is so important concept (and it has shown to be error-prone as well)
1004- /// that we need to require users to make a conscious decision to use paging or not. For that, we expose
1005- /// the aforementioned 3 methods clearly differing in naming and API, so that no unconscious choices about paging
1006- /// should be made.
1007- pub ( crate ) async fn query (
1008- & self ,
1009- statement : & Statement ,
1010- values : impl SerializeRow ,
1011- page_size : Option < PageSize > ,
1012- paging_state : PagingState ,
1013- ) -> Result < ( QueryResult , PagingStateResponse ) , ExecutionError > {
1014- let execution_profile = statement
1015- . get_execution_profile_handle ( )
1016- . unwrap_or_else ( || self . get_default_execution_profile_handle ( ) )
1017- . access ( ) ;
1018-
1019- let statement_info = RoutingInfo {
1020- consistency : statement
1021- . config
1022- . consistency
1023- . unwrap_or ( execution_profile. consistency ) ,
1024- serial_consistency : statement
1025- . config
1026- . serial_consistency
1027- . unwrap_or ( execution_profile. serial_consistency ) ,
1028- ..Default :: default ( )
1029- } ;
1030-
1031- let span = RequestSpan :: new_query ( & statement. contents ) ;
1032- let span_ref = & span;
1033- let run_request_result = self
1034- . run_request (
1035- statement_info,
1036- & statement. config ,
1037- execution_profile,
1038- |connection : Arc < Connection > ,
1039- consistency : Consistency ,
1040- execution_profile : & ExecutionProfileInner | {
1041- let serial_consistency = statement
1042- . config
1043- . serial_consistency
1044- . unwrap_or ( execution_profile. serial_consistency ) ;
1045- // Needed to avoid moving query and values into async move block
1046- let statement_ref = & statement;
1047- let values_ref = & values;
1048- let paging_state_ref = & paging_state;
1049- async move {
1050- if values_ref. is_empty ( ) {
1051- span_ref. record_request_size ( 0 ) ;
1052- connection
1053- . query_raw_with_consistency (
1054- statement_ref,
1055- consistency,
1056- serial_consistency,
1057- page_size,
1058- paging_state_ref. clone ( ) ,
1059- )
1060- . await
1061- . and_then ( QueryResponse :: into_non_error_query_response)
1062- } else {
1063- let statement =
1064- connection. prepare ( statement_ref) . await ?. bind ( values_ref) ?;
1065- span_ref. record_request_size ( statement. values . buffer_size ( ) ) ;
1066- connection
1067- . execute_raw_with_consistency (
1068- & statement,
1069- consistency,
1070- serial_consistency,
1071- page_size,
1072- paging_state_ref. clone ( ) ,
1073- )
1074- . await
1075- . and_then ( QueryResponse :: into_non_error_query_response)
1076- }
1077- }
1078- } ,
1079- & span,
1080- )
1081- . instrument ( span. span ( ) . clone ( ) )
1082- . await ?;
1083-
1084- let response = match run_request_result {
1085- RunRequestResult :: IgnoredWriteError => NonErrorQueryResponse {
1086- response : NonErrorResponse :: Result ( result:: Result :: Void ) ,
1087- tracing_id : None ,
1088- warnings : Vec :: new ( ) ,
1089- } ,
1090- RunRequestResult :: Completed ( response) => response,
1091- } ;
1092-
1093- self . handle_set_keyspace_response ( & response) . await ?;
1094- self . handle_auto_await_schema_agreement ( & response) . await ?;
1095-
1096- let ( result, paging_state_response) = response. into_query_result_and_paging_state ( ) ?;
1097- span. record_result_fields ( & result) ;
1098-
1099- Ok ( ( result, paging_state_response) )
1100- }
1101-
1102- async fn handle_set_keyspace_response (
994+ pub ( crate ) async fn handle_set_keyspace_response (
1103995 & self ,
1104996 response : & NonErrorQueryResponse ,
1105997 ) -> Result < ( ) , UseKeyspaceError > {
@@ -1115,7 +1007,7 @@ impl Session {
11151007 Ok ( ( ) )
11161008 }
11171009
1118- async fn handle_auto_await_schema_agreement (
1010+ pub ( crate ) async fn handle_auto_await_schema_agreement (
11191011 & self ,
11201012 response : & NonErrorQueryResponse ,
11211013 ) -> Result < ( ) , ExecutionError > {
@@ -1282,116 +1174,6 @@ impl Session {
12821174 statement. execute_pageable :: < true > ( self , paging_state) . await
12831175 }
12841176
1285- /// Sends a prepared request to the database, optionally continuing from a saved point.
1286- ///
1287- /// This is now an internal method only.
1288- ///
1289- /// Tl;dr: use [Session::execute_unpaged], [Session::execute_single_page] or [Session::execute_iter] instead.
1290- ///
1291- /// The rationale is that we believe that paging is so important concept (and it has shown to be error-prone as well)
1292- /// that we need to require users to make a conscious decision to use paging or not. For that, we expose
1293- /// the aforementioned 3 methods clearly differing in naming and API, so that no unconscious choices about paging
1294- /// should be made.
1295- pub ( crate ) async fn execute_bound_statement (
1296- & self ,
1297- statement : & BoundStatement ,
1298- page_size : Option < PageSize > ,
1299- paging_state : PagingState ,
1300- ) -> Result < ( QueryResult , PagingStateResponse ) , ExecutionError > {
1301- let paging_state_ref = & paging_state;
1302-
1303- let ( partition_key, token) = statement
1304- . pk_and_token ( )
1305- . map_err ( PartitionKeyError :: into_execution_error) ?
1306- . unzip ( ) ;
1307-
1308- let execution_profile = statement
1309- . prepared
1310- . get_execution_profile_handle ( )
1311- . unwrap_or_else ( || self . get_default_execution_profile_handle ( ) )
1312- . access ( ) ;
1313-
1314- let table_spec = statement. prepared . get_table_spec ( ) ;
1315-
1316- let statement_info = RoutingInfo {
1317- consistency : statement
1318- . prepared
1319- . config
1320- . consistency
1321- . unwrap_or ( execution_profile. consistency ) ,
1322- serial_consistency : statement
1323- . prepared
1324- . config
1325- . serial_consistency
1326- . unwrap_or ( execution_profile. serial_consistency ) ,
1327- token,
1328- table : table_spec,
1329- is_confirmed_lwt : statement. prepared . is_confirmed_lwt ( ) ,
1330- } ;
1331-
1332- let span = RequestSpan :: new_prepared (
1333- partition_key. as_ref ( ) . map ( |pk| pk. iter ( ) ) ,
1334- token,
1335- statement. values . buffer_size ( ) ,
1336- ) ;
1337-
1338- if !span. span ( ) . is_disabled ( ) {
1339- if let ( Some ( table_spec) , Some ( token) ) = ( statement_info. table , token) {
1340- let cluster_state = self . get_cluster_state ( ) ;
1341- let replicas = cluster_state. get_token_endpoints_iter ( table_spec, token) ;
1342- span. record_replicas ( replicas)
1343- }
1344- }
1345-
1346- let run_request_result: RunRequestResult < NonErrorQueryResponse > = self
1347- . run_request (
1348- statement_info,
1349- & statement. prepared . config ,
1350- execution_profile,
1351- |connection : Arc < Connection > ,
1352- consistency : Consistency ,
1353- execution_profile : & ExecutionProfileInner | {
1354- let serial_consistency = statement
1355- . prepared
1356- . config
1357- . serial_consistency
1358- . unwrap_or ( execution_profile. serial_consistency ) ;
1359- async move {
1360- connection
1361- . execute_raw_with_consistency (
1362- statement,
1363- consistency,
1364- serial_consistency,
1365- page_size,
1366- paging_state_ref. clone ( ) ,
1367- )
1368- . await
1369- . and_then ( QueryResponse :: into_non_error_query_response)
1370- }
1371- } ,
1372- & span,
1373- )
1374- . instrument ( span. span ( ) . clone ( ) )
1375- . await ?;
1376-
1377- let response = match run_request_result {
1378- RunRequestResult :: IgnoredWriteError => NonErrorQueryResponse {
1379- response : NonErrorResponse :: Result ( result:: Result :: Void ) ,
1380- tracing_id : None ,
1381- warnings : Vec :: new ( ) ,
1382- } ,
1383- RunRequestResult :: Completed ( response) => response,
1384- } ;
1385-
1386- self . handle_set_keyspace_response ( & response) . await ?;
1387- self . handle_auto_await_schema_agreement ( & response) . await ?;
1388-
1389- let ( result, paging_state_response) = response. into_query_result_and_paging_state ( ) ?;
1390- span. record_result_fields ( & result) ;
1391-
1392- Ok ( ( result, paging_state_response) )
1393- }
1394-
13951177 async fn do_execute_iter (
13961178 & self ,
13971179 statement : BoundStatement ,
@@ -1413,76 +1195,6 @@ impl Session {
14131195 . map_err ( PagerExecutionError :: NextPageError )
14141196 }
14151197
1416- pub ( crate ) async fn do_batch ( & self , batch : & BoundBatch ) -> Result < QueryResult , ExecutionError > {
1417- // Shard-awareness behavior for batch will be to pick shard based on first batch statement's shard
1418- // If users batch statements by shard, they will be rewarded with full shard awareness
1419- let execution_profile = batch
1420- . get_execution_profile_handle ( )
1421- . unwrap_or_else ( || self . get_default_execution_profile_handle ( ) )
1422- . access ( ) ;
1423-
1424- let consistency = batch
1425- . config
1426- . consistency
1427- . unwrap_or ( execution_profile. consistency ) ;
1428-
1429- let serial_consistency = batch
1430- . config
1431- . serial_consistency
1432- . unwrap_or ( execution_profile. serial_consistency ) ;
1433-
1434- let ( table, token) = batch
1435- . first_prepared
1436- . as_ref ( )
1437- . and_then ( |( ps, token) | ps. get_table_spec ( ) . map ( |table| ( table, * token) ) )
1438- . unzip ( ) ;
1439-
1440- let statement_info = RoutingInfo {
1441- consistency,
1442- serial_consistency,
1443- token,
1444- table,
1445- is_confirmed_lwt : false ,
1446- } ;
1447-
1448- let span = RequestSpan :: new_batch ( ) ;
1449-
1450- let run_request_result: RunRequestResult < NonErrorQueryResponse > = self
1451- . run_request (
1452- statement_info,
1453- & batch. config ,
1454- execution_profile,
1455- |connection : Arc < Connection > ,
1456- consistency : Consistency ,
1457- execution_profile : & ExecutionProfileInner | {
1458- let serial_consistency = batch
1459- . config
1460- . serial_consistency
1461- . unwrap_or ( execution_profile. serial_consistency ) ;
1462- async move {
1463- connection
1464- . batch_with_consistency ( batch, consistency, serial_consistency)
1465- . await
1466- . and_then ( QueryResponse :: into_non_error_query_response)
1467- }
1468- } ,
1469- & span,
1470- )
1471- . instrument ( span. span ( ) . clone ( ) )
1472- . await ?;
1473-
1474- let result = match run_request_result {
1475- RunRequestResult :: IgnoredWriteError => QueryResult :: mock_empty ( ) ,
1476- RunRequestResult :: Completed ( non_error_query_response) => {
1477- let result = non_error_query_response. into_query_result ( ) ?;
1478- span. record_result_fields ( & result) ;
1479- result
1480- }
1481- } ;
1482-
1483- Ok ( result)
1484- }
1485-
14861198 /// Prepares all statements within the batch and returns a new batch where every
14871199 /// statement is prepared.
14881200 /// /// # Example
@@ -1767,7 +1479,7 @@ impl Session {
17671479 /// On success, this request's result is returned.
17681480 // I tried to make this closures take a reference instead of an Arc but failed
17691481 // maybe once async closures get stabilized this can be fixed
1770- async fn run_request < ' a , QueryFut , ResT > (
1482+ pub ( crate ) async fn run_request < ' a , QueryFut , ResT > (
17711483 & ' a self ,
17721484 statement_info : RoutingInfo < ' a > ,
17731485 statement_config : & ' a StatementConfig ,
0 commit comments