Skip to content

Commit f2385d6

Browse files
committed
Add an internal only BoundStatement
this struct keeps track of a PreparedStatement and SerializedValues
1 parent e732aaa commit f2385d6

File tree

8 files changed

+186
-153
lines changed

8 files changed

+186
-153
lines changed

scylla/src/client/pager.rs

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use scylla_cql::frame::request::query::PagingState;
1818
use scylla_cql::frame::response::result::RawMetadataAndRawRows;
1919
use scylla_cql::frame::response::NonErrorResponse;
2020
use scylla_cql::frame::types::SerialConsistency;
21-
use scylla_cql::serialize::row::SerializedValues;
2221
use scylla_cql::Consistency;
2322
use std::result::Result;
2423
use thiserror::Error;
@@ -38,7 +37,8 @@ use crate::policies::load_balancing::{self, LoadBalancingPolicy, RoutingInfo};
3837
use crate::policies::retry::{RequestInfo, RetryDecision, RetrySession};
3938
use crate::response::query_result::ColumnSpecs;
4039
use crate::response::{NonErrorQueryResponse, QueryResponse};
41-
use crate::statement::prepared::{PartitionKeyError, PreparedStatement};
40+
use crate::statement::bound::BoundStatement;
41+
use crate::statement::prepared::PartitionKeyError;
4242
use crate::statement::unprepared::Statement;
4343
use tracing::{trace, trace_span, warn, Instrument};
4444
use uuid::Uuid;
@@ -64,8 +64,7 @@ struct ReceivedPage {
6464
}
6565

6666
pub(crate) struct PreparedPagerConfig {
67-
pub(crate) prepared: PreparedStatement,
68-
pub(crate) values: SerializedValues,
67+
pub(crate) bound: BoundStatement<'static>,
6968
pub(crate) execution_profile: Arc<ExecutionProfileInner>,
7069
pub(crate) cluster_state: Arc<ClusterState>,
7170
#[cfg(feature = "metrics")]
@@ -803,26 +802,30 @@ impl QueryPager {
803802
let (sender, receiver) = mpsc::channel::<Result<ReceivedPage, NextPageError>>(1);
804803

805804
let consistency = config
805+
.bound
806806
.prepared
807807
.config
808808
.consistency
809809
.unwrap_or(config.execution_profile.consistency);
810810
let serial_consistency = config
811+
.bound
811812
.prepared
812813
.config
813814
.serial_consistency
814815
.unwrap_or(config.execution_profile.serial_consistency);
815816

816-
let page_size = config.prepared.get_validated_page_size();
817+
let page_size = config.bound.prepared.get_validated_page_size();
817818

818819
let load_balancing_policy = Arc::clone(
819820
config
821+
.bound
820822
.prepared
821823
.get_load_balancing_policy()
822824
.unwrap_or(&config.execution_profile.load_balancing_policy),
823825
);
824826

825827
let retry_session = config
828+
.bound
826829
.prepared
827830
.get_retry_policy()
828831
.map(|rp| &**rp)
@@ -831,14 +834,7 @@ impl QueryPager {
831834

832835
let parent_span = tracing::Span::current();
833836
let worker_task = async move {
834-
let prepared_ref = &config.prepared;
835-
let values_ref = &config.values;
836-
837-
let (partition_key, token) = match prepared_ref
838-
.extract_partition_key_and_calculate_token(
839-
prepared_ref.get_partitioner_name(),
840-
values_ref,
841-
) {
837+
let (partition_key, token) = match config.bound.pk_and_token() {
842838
Ok(res) => res.unzip(),
843839
Err(err) => {
844840
let (proof, _res) = ProvingSender::from(sender)
@@ -848,22 +844,22 @@ impl QueryPager {
848844
}
849845
};
850846

851-
let table_spec = config.prepared.get_table_spec();
847+
let table_spec = config.bound.prepared.get_table_spec();
852848
let statement_info = RoutingInfo {
853849
consistency,
854850
serial_consistency,
855851
token,
856852
table: table_spec,
857-
is_confirmed_lwt: config.prepared.is_confirmed_lwt(),
853+
is_confirmed_lwt: config.bound.prepared.is_confirmed_lwt(),
858854
};
859855

856+
let statement = &config.bound;
860857
let page_query = |connection: Arc<Connection>,
861858
consistency: Consistency,
862859
paging_state: PagingState| async move {
863860
connection
864861
.execute_raw_with_consistency(
865-
prepared_ref,
866-
values_ref,
862+
statement,
867863
consistency,
868864
serial_consistency,
869865
Some(page_size),
@@ -872,7 +868,7 @@ impl QueryPager {
872868
.await
873869
};
874870

875-
let serialized_values_size = config.values.buffer_size();
871+
let serialized_values_size = config.bound.values.buffer_size();
876872

877873
let replicas: Option<smallvec::SmallVec<[_; 8]>> =
878874
if let (Some(table_spec), Some(token)) =
@@ -905,14 +901,14 @@ impl QueryPager {
905901
sender: sender.into(),
906902
page_query,
907903
statement_info,
908-
query_is_idempotent: config.prepared.config.is_idempotent,
904+
query_is_idempotent: config.bound.prepared.config.is_idempotent,
909905
query_consistency: consistency,
910906
load_balancing_policy,
911907
retry_session,
912908
#[cfg(feature = "metrics")]
913909
metrics: config.metrics,
914910
paging_state: PagingState::start(),
915-
history_listener: config.prepared.config.history_listener.clone(),
911+
history_listener: config.bound.prepared.config.history_listener.clone(),
916912
current_request_id: None,
917913
current_attempt_id: None,
918914
parent_span,
@@ -955,23 +951,21 @@ impl QueryPager {
955951
}
956952

957953
pub(crate) async fn new_for_connection_execute_iter(
958-
prepared: PreparedStatement,
959-
values: SerializedValues,
954+
bound: BoundStatement<'static>,
960955
connection: Arc<Connection>,
961956
consistency: Consistency,
962957
serial_consistency: Option<SerialConsistency>,
963958
) -> Result<Self, NextPageError> {
964959
let (sender, receiver) = mpsc::channel::<Result<ReceivedPage, NextPageError>>(1);
965960

966-
let page_size = prepared.get_validated_page_size();
961+
let page_size = bound.prepared.get_validated_page_size();
967962

968963
let worker_task = async move {
969964
let worker = SingleConnectionPagerWorker {
970965
sender: sender.into(),
971966
fetcher: |paging_state| {
972967
connection.execute_raw_with_consistency(
973-
&prepared,
974-
&values,
968+
&bound,
975969
consistency,
976970
serial_consistency,
977971
Some(page_size),

scylla/src/client/session.rs

Lines changed: 38 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use crate::routing::partitioner::PartitionerName;
3838
use crate::routing::{Shard, ShardAwarePortRange};
3939
use crate::statement::batch::batch_values;
4040
use crate::statement::batch::{Batch, BatchStatement};
41+
use crate::statement::bound::BoundStatement;
4142
use crate::statement::prepared::{PartitionKeyError, PreparedStatement};
4243
use crate::statement::unprepared::Statement;
4344
use crate::statement::{Consistency, PageSize, StatementConfig};
@@ -47,7 +48,7 @@ use futures::future::try_join_all;
4748
use itertools::Itertools;
4849
use scylla_cql::frame::response::NonErrorResponse;
4950
use scylla_cql::serialize::batch::BatchValues;
50-
use scylla_cql::serialize::row::{SerializeRow, SerializedValues};
51+
use scylla_cql::serialize::row::SerializeRow;
5152
use std::borrow::Borrow;
5253
use std::future::Future;
5354
use std::net::{IpAddr, SocketAddr};
@@ -640,7 +641,8 @@ impl Session {
640641
prepared: &PreparedStatement,
641642
values: impl SerializeRow,
642643
) -> Result<QueryResult, ExecutionError> {
643-
self.do_execute_unpaged(prepared, values).await
644+
let bound = prepared.bind(&values)?;
645+
self.do_execute_unpaged(&bound).await
644646
}
645647

646648
/// Executes a prepared statement, restricting results to single page.
@@ -705,8 +707,8 @@ impl Session {
705707
values: impl SerializeRow,
706708
paging_state: PagingState,
707709
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
708-
self.do_execute_single_page(prepared, values, paging_state)
709-
.await
710+
let bound = prepared.bind(&values)?;
711+
self.do_execute_single_page(&bound, paging_state).await
710712
}
711713

712714
/// Execute a prepared statement with paging.\
@@ -753,7 +755,8 @@ impl Session {
753755
prepared: impl Into<PreparedStatement>,
754756
values: impl SerializeRow,
755757
) -> Result<QueryPager, PagerExecutionError> {
756-
self.do_execute_iter(prepared.into(), values).await
758+
let bound = prepared.into().into_bind(&values)?;
759+
self.do_execute_iter(bound).await
757760
}
758761

759762
/// Execute a batch statement\
@@ -1085,12 +1088,11 @@ impl Session {
10851088
.and_then(QueryResponse::into_non_error_query_response)
10861089
} else {
10871090
let prepared = connection.prepare(statement).await?;
1088-
let serialized = prepared.serialize_values(values_ref)?;
1089-
span_ref.record_request_size(serialized.buffer_size());
1091+
let bound = prepared.bind(values_ref)?;
1092+
span_ref.record_request_size(bound.values.buffer_size());
10901093
connection
10911094
.execute_raw_with_consistency(
1092-
&prepared,
1093-
&serialized,
1095+
&bound,
10941096
consistency,
10951097
serial_consistency,
10961098
page_size,
@@ -1183,11 +1185,9 @@ impl Session {
11831185
// Making QueryPager::new_for_query work with values is too hard (if even possible)
11841186
// so instead of sending one prepare to a specific connection on each iterator query,
11851187
// we fully prepare a statement beforehand.
1186-
let prepared = self.prepare_nongeneric(&statement).await?;
1187-
let values = prepared.serialize_values(&values)?;
1188+
let bound = self.prepare_nongeneric(&statement).await?.into_bind(&values)?;
11881189
QueryPager::new_for_prepared_statement(PreparedPagerConfig {
1189-
prepared,
1190-
values,
1190+
bound,
11911191
execution_profile,
11921192
cluster_state: self.cluster.get_state(),
11931193
#[cfg(feature = "metrics")]
@@ -1356,13 +1356,9 @@ impl Session {
13561356

13571357
async fn do_execute_unpaged(
13581358
&self,
1359-
prepared: &PreparedStatement,
1360-
values: impl SerializeRow,
1359+
bound: &BoundStatement<'_>,
13611360
) -> Result<QueryResult, ExecutionError> {
1362-
let serialized_values = prepared.serialize_values(&values)?;
1363-
let (result, paging_state) = self
1364-
.execute(prepared, &serialized_values, None, PagingState::start())
1365-
.await?;
1361+
let (result, paging_state) = self.execute(bound, None, PagingState::start()).await?;
13661362
if !paging_state.finished() {
13671363
error!("Unpaged prepared query returned a non-empty paging state! This is a driver-side or server-side bug.");
13681364
return Err(ExecutionError::LastAttemptError(
@@ -1374,14 +1370,11 @@ impl Session {
13741370

13751371
async fn do_execute_single_page(
13761372
&self,
1377-
prepared: &PreparedStatement,
1378-
values: impl SerializeRow,
1373+
bound: &BoundStatement<'_>,
13791374
paging_state: PagingState,
13801375
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1381-
let serialized_values = prepared.serialize_values(&values)?;
1382-
let page_size = prepared.get_validated_page_size();
1383-
self.execute(prepared, &serialized_values, Some(page_size), paging_state)
1384-
.await
1376+
let page_size = bound.prepared.get_validated_page_size();
1377+
self.execute(bound, Some(page_size), paging_state).await
13851378
}
13861379

13871380
/// Sends a prepared request to the database, optionally continuing from a saved point.
@@ -1396,44 +1389,45 @@ impl Session {
13961389
/// should be made.
13971390
async fn execute(
13981391
&self,
1399-
prepared: &PreparedStatement,
1400-
serialized_values: &SerializedValues,
1392+
bound: &BoundStatement<'_>,
14011393
page_size: Option<PageSize>,
14021394
paging_state: PagingState,
14031395
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1404-
let values_ref = &serialized_values;
14051396
let paging_state_ref = &paging_state;
14061397

1407-
let (partition_key, token) = prepared
1408-
.extract_partition_key_and_calculate_token(prepared.get_partitioner_name(), values_ref)
1398+
let (partition_key, token) = bound
1399+
.pk_and_token()
14091400
.map_err(PartitionKeyError::into_execution_error)?
14101401
.unzip();
14111402

1412-
let execution_profile = prepared
1403+
let execution_profile = bound
1404+
.prepared
14131405
.get_execution_profile_handle()
14141406
.unwrap_or_else(|| self.get_default_execution_profile_handle())
14151407
.access();
14161408

1417-
let table_spec = prepared.get_table_spec();
1409+
let table_spec = bound.prepared.get_table_spec();
14181410

14191411
let statement_info = RoutingInfo {
1420-
consistency: prepared
1412+
consistency: bound
1413+
.prepared
14211414
.config
14221415
.consistency
14231416
.unwrap_or(execution_profile.consistency),
1424-
serial_consistency: prepared
1417+
serial_consistency: bound
1418+
.prepared
14251419
.config
14261420
.serial_consistency
14271421
.unwrap_or(execution_profile.serial_consistency),
14281422
token,
14291423
table: table_spec,
1430-
is_confirmed_lwt: prepared.is_confirmed_lwt(),
1424+
is_confirmed_lwt: bound.prepared.is_confirmed_lwt(),
14311425
};
14321426

14331427
let span = RequestSpan::new_prepared(
14341428
partition_key.as_ref().map(|pk| pk.iter()),
14351429
token,
1436-
serialized_values.buffer_size(),
1430+
bound.values.buffer_size(),
14371431
);
14381432

14391433
if !span.span().is_disabled() {
@@ -1450,20 +1444,20 @@ impl Session {
14501444
) = self
14511445
.run_request(
14521446
statement_info,
1453-
&prepared.config,
1447+
&bound.prepared.config,
14541448
execution_profile,
14551449
|connection: Arc<Connection>,
14561450
consistency: Consistency,
14571451
execution_profile: &ExecutionProfileInner| {
1458-
let serial_consistency = prepared
1452+
let serial_consistency = bound
1453+
.prepared
14591454
.config
14601455
.serial_consistency
14611456
.unwrap_or(execution_profile.serial_consistency);
14621457
async move {
14631458
connection
14641459
.execute_raw_with_consistency(
1465-
prepared,
1466-
values_ref,
1460+
bound,
14671461
consistency,
14681462
serial_consistency,
14691463
page_size,
@@ -1496,19 +1490,16 @@ impl Session {
14961490

14971491
async fn do_execute_iter(
14981492
&self,
1499-
prepared: PreparedStatement,
1500-
values: impl SerializeRow,
1493+
bound: BoundStatement<'static>,
15011494
) -> Result<QueryPager, PagerExecutionError> {
1502-
let serialized_values = prepared.serialize_values(&values)?;
1503-
1504-
let execution_profile = prepared
1495+
let execution_profile = bound
1496+
.prepared
15051497
.get_execution_profile_handle()
15061498
.unwrap_or_else(|| self.get_default_execution_profile_handle())
15071499
.access();
15081500

15091501
QueryPager::new_for_prepared_statement(PreparedPagerConfig {
1510-
prepared,
1511-
values: serialized_values,
1502+
bound,
15121503
execution_profile,
15131504
cluster_state: self.cluster.get_state(),
15141505
#[cfg(feature = "metrics")]

0 commit comments

Comments
 (0)