Skip to content

Commit 2061d9b

Browse files
committed
Add an internal only BoundStatement
this struct keeps track of a PreparedStatement and SerializedValues
1 parent bc6b24d commit 2061d9b

File tree

8 files changed

+186
-153
lines changed

8 files changed

+186
-153
lines changed

scylla/src/client/pager.rs

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use scylla_cql::frame::request::query::PagingState;
1818
use scylla_cql::frame::response::result::RawMetadataAndRawRows;
1919
use scylla_cql::frame::response::NonErrorResponse;
2020
use scylla_cql::frame::types::SerialConsistency;
21-
use scylla_cql::serialize::row::SerializedValues;
2221
use scylla_cql::Consistency;
2322
use std::result::Result;
2423
use thiserror::Error;
@@ -38,7 +37,8 @@ use crate::policies::load_balancing::{self, LoadBalancingPolicy, RoutingInfo};
3837
use crate::policies::retry::{RequestInfo, RetryDecision, RetrySession};
3938
use crate::response::query_result::ColumnSpecs;
4039
use crate::response::{NonErrorQueryResponse, QueryResponse};
41-
use crate::statement::prepared::{PartitionKeyError, PreparedStatement};
40+
use crate::statement::bound::BoundStatement;
41+
use crate::statement::prepared::PartitionKeyError;
4242
use crate::statement::unprepared::Statement;
4343
use tracing::{trace, trace_span, warn, Instrument};
4444
use uuid::Uuid;
@@ -64,8 +64,7 @@ struct ReceivedPage {
6464
}
6565

6666
pub(crate) struct PreparedPagerConfig {
67-
pub(crate) prepared: PreparedStatement,
68-
pub(crate) values: SerializedValues,
67+
pub(crate) bound: BoundStatement<'static>,
6968
pub(crate) execution_profile: Arc<ExecutionProfileInner>,
7069
pub(crate) cluster_state: Arc<ClusterState>,
7170
#[cfg(feature = "metrics")]
@@ -803,26 +802,30 @@ impl QueryPager {
803802
let (sender, receiver) = mpsc::channel::<Result<ReceivedPage, NextPageError>>(1);
804803

805804
let consistency = config
805+
.bound
806806
.prepared
807807
.config
808808
.consistency
809809
.unwrap_or(config.execution_profile.consistency);
810810
let serial_consistency = config
811+
.bound
811812
.prepared
812813
.config
813814
.serial_consistency
814815
.unwrap_or(config.execution_profile.serial_consistency);
815816

816-
let page_size = config.prepared.get_validated_page_size();
817+
let page_size = config.bound.prepared.get_validated_page_size();
817818

818819
let load_balancing_policy = Arc::clone(
819820
config
821+
.bound
820822
.prepared
821823
.get_load_balancing_policy()
822824
.unwrap_or(&config.execution_profile.load_balancing_policy),
823825
);
824826

825827
let retry_session = config
828+
.bound
826829
.prepared
827830
.get_retry_policy()
828831
.map(|rp| &**rp)
@@ -831,14 +834,7 @@ impl QueryPager {
831834

832835
let parent_span = tracing::Span::current();
833836
let worker_task = async move {
834-
let prepared_ref = &config.prepared;
835-
let values_ref = &config.values;
836-
837-
let (partition_key, token) = match prepared_ref
838-
.extract_partition_key_and_calculate_token(
839-
prepared_ref.get_partitioner_name(),
840-
values_ref,
841-
) {
837+
let (partition_key, token) = match config.bound.pk_and_token() {
842838
Ok(res) => res.unzip(),
843839
Err(err) => {
844840
let (proof, _res) = ProvingSender::from(sender)
@@ -848,22 +844,22 @@ impl QueryPager {
848844
}
849845
};
850846

851-
let table_spec = config.prepared.get_table_spec();
847+
let table_spec = config.bound.prepared.get_table_spec();
852848
let statement_info = RoutingInfo {
853849
consistency,
854850
serial_consistency,
855851
token,
856852
table: table_spec,
857-
is_confirmed_lwt: config.prepared.is_confirmed_lwt(),
853+
is_confirmed_lwt: config.bound.prepared.is_confirmed_lwt(),
858854
};
859855

856+
let statement = &config.bound;
860857
let page_query = |connection: Arc<Connection>,
861858
consistency: Consistency,
862859
paging_state: PagingState| async move {
863860
connection
864861
.execute_raw_with_consistency(
865-
prepared_ref,
866-
values_ref,
862+
statement,
867863
consistency,
868864
serial_consistency,
869865
Some(page_size),
@@ -872,7 +868,7 @@ impl QueryPager {
872868
.await
873869
};
874870

875-
let serialized_values_size = config.values.buffer_size();
871+
let serialized_values_size = config.bound.values.buffer_size();
876872

877873
let replicas: Option<smallvec::SmallVec<[_; 8]>> =
878874
if let (Some(table_spec), Some(token)) =
@@ -905,14 +901,14 @@ impl QueryPager {
905901
sender: sender.into(),
906902
page_query,
907903
statement_info,
908-
query_is_idempotent: config.prepared.config.is_idempotent,
904+
query_is_idempotent: config.bound.prepared.config.is_idempotent,
909905
query_consistency: consistency,
910906
load_balancing_policy,
911907
retry_session,
912908
#[cfg(feature = "metrics")]
913909
metrics: config.metrics,
914910
paging_state: PagingState::start(),
915-
history_listener: config.prepared.config.history_listener.clone(),
911+
history_listener: config.bound.prepared.config.history_listener.clone(),
916912
current_request_id: None,
917913
current_attempt_id: None,
918914
parent_span,
@@ -955,23 +951,21 @@ impl QueryPager {
955951
}
956952

957953
pub(crate) async fn new_for_connection_execute_iter(
958-
prepared: PreparedStatement,
959-
values: SerializedValues,
954+
bound: BoundStatement<'static>,
960955
connection: Arc<Connection>,
961956
consistency: Consistency,
962957
serial_consistency: Option<SerialConsistency>,
963958
) -> Result<Self, NextPageError> {
964959
let (sender, receiver) = mpsc::channel::<Result<ReceivedPage, NextPageError>>(1);
965960

966-
let page_size = prepared.get_validated_page_size();
961+
let page_size = bound.prepared.get_validated_page_size();
967962

968963
let worker_task = async move {
969964
let worker = SingleConnectionPagerWorker {
970965
sender: sender.into(),
971966
fetcher: |paging_state| {
972967
connection.execute_raw_with_consistency(
973-
&prepared,
974-
&values,
968+
&bound,
975969
consistency,
976970
serial_consistency,
977971
Some(page_size),

scylla/src/client/session.rs

Lines changed: 38 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use crate::routing::partitioner::PartitionerName;
3838
use crate::routing::{Shard, ShardAwarePortRange};
3939
use crate::statement::batch::batch_values;
4040
use crate::statement::batch::{Batch, BatchStatement};
41+
use crate::statement::bound::BoundStatement;
4142
use crate::statement::prepared::{PartitionKeyError, PreparedStatement};
4243
use crate::statement::unprepared::Statement;
4344
use crate::statement::{Consistency, PageSize, StatementConfig};
@@ -47,7 +48,7 @@ use futures::future::try_join_all;
4748
use itertools::Itertools;
4849
use scylla_cql::frame::response::NonErrorResponse;
4950
use scylla_cql::serialize::batch::BatchValues;
50-
use scylla_cql::serialize::row::{SerializeRow, SerializedValues};
51+
use scylla_cql::serialize::row::SerializeRow;
5152
use std::borrow::Borrow;
5253
use std::future::Future;
5354
use std::net::{IpAddr, SocketAddr};
@@ -640,7 +641,8 @@ impl Session {
640641
prepared: &PreparedStatement,
641642
values: impl SerializeRow,
642643
) -> Result<QueryResult, ExecutionError> {
643-
self.do_execute_unpaged(prepared, values).await
644+
let bound = prepared.bind(&values)?;
645+
self.do_execute_unpaged(&bound).await
644646
}
645647

646648
/// Executes a prepared statement, restricting results to single page.
@@ -705,8 +707,8 @@ impl Session {
705707
values: impl SerializeRow,
706708
paging_state: PagingState,
707709
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
708-
self.do_execute_single_page(prepared, values, paging_state)
709-
.await
710+
let bound = prepared.bind(&values)?;
711+
self.do_execute_single_page(&bound, paging_state).await
710712
}
711713

712714
/// Execute a prepared statement with paging.\
@@ -753,7 +755,8 @@ impl Session {
753755
prepared: impl Into<PreparedStatement>,
754756
values: impl SerializeRow,
755757
) -> Result<QueryPager, PagerExecutionError> {
756-
self.do_execute_iter(prepared.into(), values).await
758+
let bound = prepared.into().into_bind(&values)?;
759+
self.do_execute_iter(bound).await
757760
}
758761

759762
/// Execute a batch statement\
@@ -1085,12 +1088,11 @@ impl Session {
10851088
.and_then(QueryResponse::into_non_error_query_response)
10861089
} else {
10871090
let prepared = connection.prepare(statement).await?;
1088-
let serialized = prepared.serialize_values(values_ref)?;
1089-
span_ref.record_request_size(serialized.buffer_size());
1091+
let bound = prepared.bind(values_ref)?;
1092+
span_ref.record_request_size(bound.values.buffer_size());
10901093
connection
10911094
.execute_raw_with_consistency(
1092-
&prepared,
1093-
&serialized,
1095+
&bound,
10941096
consistency,
10951097
serial_consistency,
10961098
page_size,
@@ -1187,11 +1189,9 @@ impl Session {
11871189
// Making QueryPager::new_for_query work with values is too hard (if even possible)
11881190
// so instead of sending one prepare to a specific connection on each iterator query,
11891191
// we fully prepare a statement beforehand.
1190-
let prepared = self.prepare_nongeneric(&statement).await?;
1191-
let values = prepared.serialize_values(&values)?;
1192+
let bound = self.prepare_nongeneric(&statement).await?.into_bind(&values)?;
11921193
QueryPager::new_for_prepared_statement(PreparedPagerConfig {
1193-
prepared,
1194-
values,
1194+
bound,
11951195
execution_profile,
11961196
cluster_state: self.cluster.get_state(),
11971197
#[cfg(feature = "metrics")]
@@ -1360,13 +1360,9 @@ impl Session {
13601360

13611361
async fn do_execute_unpaged(
13621362
&self,
1363-
prepared: &PreparedStatement,
1364-
values: impl SerializeRow,
1363+
bound: &BoundStatement<'_>,
13651364
) -> Result<QueryResult, ExecutionError> {
1366-
let serialized_values = prepared.serialize_values(&values)?;
1367-
let (result, paging_state) = self
1368-
.execute(prepared, &serialized_values, None, PagingState::start())
1369-
.await?;
1365+
let (result, paging_state) = self.execute(bound, None, PagingState::start()).await?;
13701366
if !paging_state.finished() {
13711367
error!("Unpaged prepared query returned a non-empty paging state! This is a driver-side or server-side bug.");
13721368
return Err(ExecutionError::LastAttemptError(
@@ -1378,14 +1374,11 @@ impl Session {
13781374

13791375
async fn do_execute_single_page(
13801376
&self,
1381-
prepared: &PreparedStatement,
1382-
values: impl SerializeRow,
1377+
bound: &BoundStatement<'_>,
13831378
paging_state: PagingState,
13841379
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1385-
let serialized_values = prepared.serialize_values(&values)?;
1386-
let page_size = prepared.get_validated_page_size();
1387-
self.execute(prepared, &serialized_values, Some(page_size), paging_state)
1388-
.await
1380+
let page_size = bound.prepared.get_validated_page_size();
1381+
self.execute(bound, Some(page_size), paging_state).await
13891382
}
13901383

13911384
/// Sends a prepared request to the database, optionally continuing from a saved point.
@@ -1400,44 +1393,45 @@ impl Session {
14001393
/// should be made.
14011394
async fn execute(
14021395
&self,
1403-
prepared: &PreparedStatement,
1404-
serialized_values: &SerializedValues,
1396+
bound: &BoundStatement<'_>,
14051397
page_size: Option<PageSize>,
14061398
paging_state: PagingState,
14071399
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1408-
let values_ref = &serialized_values;
14091400
let paging_state_ref = &paging_state;
14101401

1411-
let (partition_key, token) = prepared
1412-
.extract_partition_key_and_calculate_token(prepared.get_partitioner_name(), values_ref)
1402+
let (partition_key, token) = bound
1403+
.pk_and_token()
14131404
.map_err(PartitionKeyError::into_execution_error)?
14141405
.unzip();
14151406

1416-
let execution_profile = prepared
1407+
let execution_profile = bound
1408+
.prepared
14171409
.get_execution_profile_handle()
14181410
.unwrap_or_else(|| self.get_default_execution_profile_handle())
14191411
.access();
14201412

1421-
let table_spec = prepared.get_table_spec();
1413+
let table_spec = bound.prepared.get_table_spec();
14221414

14231415
let statement_info = RoutingInfo {
1424-
consistency: prepared
1416+
consistency: bound
1417+
.prepared
14251418
.config
14261419
.consistency
14271420
.unwrap_or(execution_profile.consistency),
1428-
serial_consistency: prepared
1421+
serial_consistency: bound
1422+
.prepared
14291423
.config
14301424
.serial_consistency
14311425
.unwrap_or(execution_profile.serial_consistency),
14321426
token,
14331427
table: table_spec,
1434-
is_confirmed_lwt: prepared.is_confirmed_lwt(),
1428+
is_confirmed_lwt: bound.prepared.is_confirmed_lwt(),
14351429
};
14361430

14371431
let span = RequestSpan::new_prepared(
14381432
partition_key.as_ref().map(|pk| pk.iter()),
14391433
token,
1440-
serialized_values.buffer_size(),
1434+
bound.values.buffer_size(),
14411435
);
14421436

14431437
if !span.span().is_disabled() {
@@ -1454,20 +1448,20 @@ impl Session {
14541448
) = self
14551449
.run_request(
14561450
statement_info,
1457-
&prepared.config,
1451+
&bound.prepared.config,
14581452
execution_profile,
14591453
|connection: Arc<Connection>,
14601454
consistency: Consistency,
14611455
execution_profile: &ExecutionProfileInner| {
1462-
let serial_consistency = prepared
1456+
let serial_consistency = bound
1457+
.prepared
14631458
.config
14641459
.serial_consistency
14651460
.unwrap_or(execution_profile.serial_consistency);
14661461
async move {
14671462
connection
14681463
.execute_raw_with_consistency(
1469-
prepared,
1470-
values_ref,
1464+
bound,
14711465
consistency,
14721466
serial_consistency,
14731467
page_size,
@@ -1504,19 +1498,16 @@ impl Session {
15041498

15051499
async fn do_execute_iter(
15061500
&self,
1507-
prepared: PreparedStatement,
1508-
values: impl SerializeRow,
1501+
bound: BoundStatement<'static>,
15091502
) -> Result<QueryPager, PagerExecutionError> {
1510-
let serialized_values = prepared.serialize_values(&values)?;
1511-
1512-
let execution_profile = prepared
1503+
let execution_profile = bound
1504+
.prepared
15131505
.get_execution_profile_handle()
15141506
.unwrap_or_else(|| self.get_default_execution_profile_handle())
15151507
.access();
15161508

15171509
QueryPager::new_for_prepared_statement(PreparedPagerConfig {
1518-
prepared,
1519-
values: serialized_values,
1510+
bound,
15201511
execution_profile,
15211512
cluster_state: self.cluster.get_state(),
15221513
#[cfg(feature = "metrics")]

0 commit comments

Comments
 (0)