Skip to content

Commit 8481e96

Browse files
committed
move statement type specific request code to their module
this is a no-op the goal is just to make the code less tangled/nested
1 parent 739bce7 commit 8481e96

File tree

6 files changed

+366
-359
lines changed

6 files changed

+366
-359
lines changed

scylla/src/client/session.rs

Lines changed: 6 additions & 293 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ use crate::errors::{
1515
ExecutionError, MetadataError, NewSessionError, PagerExecutionError, PrepareError,
1616
RequestAttemptError, RequestError, SchemaAgreementError, TracingError, UseKeyspaceError,
1717
};
18-
use crate::frame::response::result;
1918
use crate::network::tls::TlsProvider;
2019
use crate::network::{Connection, ConnectionConfig, PoolConfig, VerifiedKeyspaceName};
2120
use crate::observability::driver_tracing::RequestSpan;
@@ -31,22 +30,21 @@ use crate::policies::speculative_execution;
3130
use crate::policies::timestamp_generator::TimestampGenerator;
3231
use crate::response::query_result::{MaybeFirstRowError, QueryResult, RowsError};
3332
use crate::response::{
34-
NonErrorQueryResponse, PagingState, PagingStateResponse, QueryResponse, RawPreparedStatement,
33+
NonErrorQueryResponse, PagingState, PagingStateResponse, RawPreparedStatement,
3534
};
3635
use crate::routing::partitioner::PartitionerName;
3736
use crate::routing::{Shard, ShardAwarePortRange};
3837
use crate::statement::batch::BoundBatch;
3938
use crate::statement::batch::{Batch, BatchStatement};
4039
use crate::statement::bound::BoundStatement;
4140
use crate::statement::execute::{Execute, ExecutePageable};
42-
use crate::statement::prepared::{PartitionKeyError, PreparedStatement};
41+
use crate::statement::prepared::PreparedStatement;
4342
use crate::statement::unprepared::Statement;
44-
use crate::statement::{Consistency, PageSize, StatementConfig};
43+
use crate::statement::{Consistency, StatementConfig};
4544
use arc_swap::ArcSwapOption;
4645
use futures::future::join_all;
4746
use futures::future::try_join_all;
4847
use itertools::Itertools;
49-
use scylla_cql::frame::response::NonErrorResponse;
5048
use scylla_cql::serialize::batch::{BatchValues, BatchValuesIterator};
5149
use scylla_cql::serialize::row::SerializeRow;
5250
use std::borrow::{Borrow, Cow};
@@ -995,112 +993,7 @@ impl Session {
995993
Ok(session)
996994
}
997995

998-
/// Sends a request to the database.
999-
/// Optionally continues fetching results from a saved point.
1000-
///
1001-
/// This is now an internal method only.
1002-
///
1003-
/// Tl;dr: use [Session::query_unpaged], [Session::query_single_page] or [Session::query_iter] instead.
1004-
///
1005-
/// The rationale is that we believe that paging is so important concept (and it has shown to be error-prone as well)
1006-
/// that we need to require users to make a conscious decision to use paging or not. For that, we expose
1007-
/// the aforementioned 3 methods clearly differing in naming and API, so that no unconscious choices about paging
1008-
/// should be made.
1009-
pub(crate) async fn query(
1010-
&self,
1011-
statement: &Statement,
1012-
values: impl SerializeRow,
1013-
page_size: Option<PageSize>,
1014-
paging_state: PagingState,
1015-
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1016-
let execution_profile = statement
1017-
.get_execution_profile_handle()
1018-
.unwrap_or_else(|| self.get_default_execution_profile_handle())
1019-
.access();
1020-
1021-
let statement_info = RoutingInfo {
1022-
consistency: statement
1023-
.config
1024-
.consistency
1025-
.unwrap_or(execution_profile.consistency),
1026-
serial_consistency: statement
1027-
.config
1028-
.serial_consistency
1029-
.unwrap_or(execution_profile.serial_consistency),
1030-
..Default::default()
1031-
};
1032-
1033-
let span = RequestSpan::new_query(&statement.contents);
1034-
let span_ref = &span;
1035-
let run_request_result = self
1036-
.run_request(
1037-
statement_info,
1038-
&statement.config,
1039-
execution_profile,
1040-
|connection: Arc<Connection>,
1041-
consistency: Consistency,
1042-
execution_profile: &ExecutionProfileInner| {
1043-
let serial_consistency = statement
1044-
.config
1045-
.serial_consistency
1046-
.unwrap_or(execution_profile.serial_consistency);
1047-
// Needed to avoid moving query and values into async move block
1048-
let values_ref = &values;
1049-
let paging_state_ref = &paging_state;
1050-
async move {
1051-
if values_ref.is_empty() {
1052-
span_ref.record_request_size(0);
1053-
connection
1054-
.query_raw_with_consistency(
1055-
statement,
1056-
consistency,
1057-
serial_consistency,
1058-
page_size,
1059-
paging_state_ref.clone(),
1060-
)
1061-
.await
1062-
.and_then(QueryResponse::into_non_error_query_response)
1063-
} else {
1064-
let statement =
1065-
connection.prepare(statement).await?.bind(values_ref)?;
1066-
span_ref.record_request_size(statement.values.buffer_size());
1067-
connection
1068-
.execute_raw_with_consistency(
1069-
&statement,
1070-
consistency,
1071-
serial_consistency,
1072-
page_size,
1073-
paging_state_ref.clone(),
1074-
)
1075-
.await
1076-
.and_then(QueryResponse::into_non_error_query_response)
1077-
}
1078-
}
1079-
},
1080-
&span,
1081-
)
1082-
.instrument(span.span().clone())
1083-
.await?;
1084-
1085-
let response = match run_request_result {
1086-
RunRequestResult::IgnoredWriteError => NonErrorQueryResponse {
1087-
response: NonErrorResponse::Result(result::Result::Void),
1088-
tracing_id: None,
1089-
warnings: Vec::new(),
1090-
},
1091-
RunRequestResult::Completed(response) => response,
1092-
};
1093-
1094-
self.handle_set_keyspace_response(&response).await?;
1095-
self.handle_auto_await_schema_agreement(&response).await?;
1096-
1097-
let (result, paging_state_response) = response.into_query_result_and_paging_state()?;
1098-
span.record_result_fields(&result);
1099-
1100-
Ok((result, paging_state_response))
1101-
}
1102-
1103-
async fn handle_set_keyspace_response(
996+
pub(crate) async fn handle_set_keyspace_response(
1104997
&self,
1105998
response: &NonErrorQueryResponse,
1106999
) -> Result<(), UseKeyspaceError> {
@@ -1116,7 +1009,7 @@ impl Session {
11161009
Ok(())
11171010
}
11181011

1119-
async fn handle_auto_await_schema_agreement(
1012+
pub(crate) async fn handle_auto_await_schema_agreement(
11201013
&self,
11211014
response: &NonErrorQueryResponse,
11221015
) -> Result<(), ExecutionError> {
@@ -1282,116 +1175,6 @@ impl Session {
12821175
statement.execute_pageable::<true>(self, paging_state).await
12831176
}
12841177

1285-
/// Sends a prepared request to the database, optionally continuing from a saved point.
1286-
///
1287-
/// This is now an internal method only.
1288-
///
1289-
/// Tl;dr: use [Session::execute_unpaged], [Session::execute_single_page] or [Session::execute_iter] instead.
1290-
///
1291-
/// The rationale is that we believe that paging is so important concept (and it has shown to be error-prone as well)
1292-
/// that we need to require users to make a conscious decision to use paging or not. For that, we expose
1293-
/// the aforementioned 3 methods clearly differing in naming and API, so that no unconscious choices about paging
1294-
/// should be made.
1295-
pub(crate) async fn execute_bound_statement(
1296-
&self,
1297-
statement: &BoundStatement,
1298-
page_size: Option<PageSize>,
1299-
paging_state: PagingState,
1300-
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1301-
let paging_state_ref = &paging_state;
1302-
1303-
let (partition_key, token) = statement
1304-
.pk_and_token()
1305-
.map_err(PartitionKeyError::into_execution_error)?
1306-
.unzip();
1307-
1308-
let execution_profile = statement
1309-
.prepared
1310-
.get_execution_profile_handle()
1311-
.unwrap_or_else(|| self.get_default_execution_profile_handle())
1312-
.access();
1313-
1314-
let table_spec = statement.prepared.get_table_spec();
1315-
1316-
let statement_info = RoutingInfo {
1317-
consistency: statement
1318-
.prepared
1319-
.config
1320-
.consistency
1321-
.unwrap_or(execution_profile.consistency),
1322-
serial_consistency: statement
1323-
.prepared
1324-
.config
1325-
.serial_consistency
1326-
.unwrap_or(execution_profile.serial_consistency),
1327-
token,
1328-
table: table_spec,
1329-
is_confirmed_lwt: statement.prepared.is_confirmed_lwt(),
1330-
};
1331-
1332-
let span = RequestSpan::new_prepared(
1333-
partition_key.as_ref().map(|pk| pk.iter()),
1334-
token,
1335-
statement.values.buffer_size(),
1336-
);
1337-
1338-
if !span.span().is_disabled() {
1339-
if let (Some(table_spec), Some(token)) = (statement_info.table, token) {
1340-
let cluster_state = self.get_cluster_state();
1341-
let replicas = cluster_state.get_token_endpoints_iter(table_spec, token);
1342-
span.record_replicas(replicas)
1343-
}
1344-
}
1345-
1346-
let run_request_result: RunRequestResult<NonErrorQueryResponse> = self
1347-
.run_request(
1348-
statement_info,
1349-
&statement.prepared.config,
1350-
execution_profile,
1351-
|connection: Arc<Connection>,
1352-
consistency: Consistency,
1353-
execution_profile: &ExecutionProfileInner| {
1354-
let serial_consistency = statement
1355-
.prepared
1356-
.config
1357-
.serial_consistency
1358-
.unwrap_or(execution_profile.serial_consistency);
1359-
async move {
1360-
connection
1361-
.execute_raw_with_consistency(
1362-
statement,
1363-
consistency,
1364-
serial_consistency,
1365-
page_size,
1366-
paging_state_ref.clone(),
1367-
)
1368-
.await
1369-
.and_then(QueryResponse::into_non_error_query_response)
1370-
}
1371-
},
1372-
&span,
1373-
)
1374-
.instrument(span.span().clone())
1375-
.await?;
1376-
1377-
let response = match run_request_result {
1378-
RunRequestResult::IgnoredWriteError => NonErrorQueryResponse {
1379-
response: NonErrorResponse::Result(result::Result::Void),
1380-
tracing_id: None,
1381-
warnings: Vec::new(),
1382-
},
1383-
RunRequestResult::Completed(response) => response,
1384-
};
1385-
1386-
self.handle_set_keyspace_response(&response).await?;
1387-
self.handle_auto_await_schema_agreement(&response).await?;
1388-
1389-
let (result, paging_state_response) = response.into_query_result_and_paging_state()?;
1390-
span.record_result_fields(&result);
1391-
1392-
Ok((result, paging_state_response))
1393-
}
1394-
13951178
async fn do_execute_iter(
13961179
&self,
13971180
statement: BoundStatement,
@@ -1413,76 +1196,6 @@ impl Session {
14131196
.map_err(PagerExecutionError::NextPageError)
14141197
}
14151198

1416-
pub(crate) async fn do_batch(&self, batch: &BoundBatch) -> Result<QueryResult, ExecutionError> {
1417-
// Shard-awareness behavior for batch will be to pick shard based on first batch statement's shard
1418-
// If users batch statements by shard, they will be rewarded with full shard awareness
1419-
let execution_profile = batch
1420-
.get_execution_profile_handle()
1421-
.unwrap_or_else(|| self.get_default_execution_profile_handle())
1422-
.access();
1423-
1424-
let consistency = batch
1425-
.config
1426-
.consistency
1427-
.unwrap_or(execution_profile.consistency);
1428-
1429-
let serial_consistency = batch
1430-
.config
1431-
.serial_consistency
1432-
.unwrap_or(execution_profile.serial_consistency);
1433-
1434-
let (table, token) = batch
1435-
.first_prepared
1436-
.as_ref()
1437-
.and_then(|(ps, token)| ps.get_table_spec().map(|table| (table, *token)))
1438-
.unzip();
1439-
1440-
let statement_info = RoutingInfo {
1441-
consistency,
1442-
serial_consistency,
1443-
token,
1444-
table,
1445-
is_confirmed_lwt: false,
1446-
};
1447-
1448-
let span = RequestSpan::new_batch();
1449-
1450-
let run_request_result: RunRequestResult<NonErrorQueryResponse> = self
1451-
.run_request(
1452-
statement_info,
1453-
&batch.config,
1454-
execution_profile,
1455-
|connection: Arc<Connection>,
1456-
consistency: Consistency,
1457-
execution_profile: &ExecutionProfileInner| {
1458-
let serial_consistency = batch
1459-
.config
1460-
.serial_consistency
1461-
.unwrap_or(execution_profile.serial_consistency);
1462-
async move {
1463-
connection
1464-
.batch_with_consistency(batch, consistency, serial_consistency)
1465-
.await
1466-
.and_then(QueryResponse::into_non_error_query_response)
1467-
}
1468-
},
1469-
&span,
1470-
)
1471-
.instrument(span.span().clone())
1472-
.await?;
1473-
1474-
let result = match run_request_result {
1475-
RunRequestResult::IgnoredWriteError => QueryResult::mock_empty(),
1476-
RunRequestResult::Completed(non_error_query_response) => {
1477-
let result = non_error_query_response.into_query_result()?;
1478-
span.record_result_fields(&result);
1479-
result
1480-
}
1481-
};
1482-
1483-
Ok(result)
1484-
}
1485-
14861199
/// Prepares all statements within the batch and returns a new batch where every
14871200
/// statement is prepared.
14881201
/// /// # Example
@@ -1767,7 +1480,7 @@ impl Session {
17671480
/// On success, this request's result is returned.
17681481
// I tried to make this closures take a reference instead of an Arc but failed
17691482
// maybe once async closures get stabilized this can be fixed
1770-
async fn run_request<'a, QueryFut, ResT>(
1483+
pub(crate) async fn run_request<'a, QueryFut, ResT>(
17711484
&'a self,
17721485
statement_info: RoutingInfo<'a>,
17731486
statement_config: &'a StatementConfig,

scylla/src/network/connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1115,7 +1115,7 @@ impl Connection {
11151115
consistency,
11161116
serial_consistency,
11171117
timestamp,
1118-
statements_len: batch.statements_len,
1118+
statements_len: batch.statements_len(),
11191119
};
11201120

11211121
loop {

0 commit comments

Comments
 (0)