Skip to content

Commit 6bb3f88

Browse files
committed
Add an internal only BoundStatement
this struct keeps track of a PreparedStatement and SerializedValues
1 parent 0085612 commit 6bb3f88

File tree

8 files changed

+187
-155
lines changed

8 files changed

+187
-155
lines changed

scylla/src/client/pager.rs

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use scylla_cql::frame::request::query::PagingState;
1818
use scylla_cql::frame::response::result::RawMetadataAndRawRows;
1919
use scylla_cql::frame::response::NonErrorResponse;
2020
use scylla_cql::frame::types::SerialConsistency;
21-
use scylla_cql::serialize::row::SerializedValues;
2221
use scylla_cql::Consistency;
2322
use std::result::Result;
2423
use thiserror::Error;
@@ -38,7 +37,8 @@ use crate::policies::load_balancing::{self, LoadBalancingPolicy, RoutingInfo};
3837
use crate::policies::retry::{RequestInfo, RetryDecision, RetrySession};
3938
use crate::response::query_result::ColumnSpecs;
4039
use crate::response::{NonErrorQueryResponse, QueryResponse};
41-
use crate::statement::prepared::{PartitionKeyError, PreparedStatement};
40+
use crate::statement::bound::BoundStatement;
41+
use crate::statement::prepared::PartitionKeyError;
4242
use crate::statement::unprepared::Statement;
4343
use tracing::{trace, trace_span, warn, Instrument};
4444
use uuid::Uuid;
@@ -63,8 +63,7 @@ struct ReceivedPage {
6363
}
6464

6565
pub(crate) struct PreparedPagerConfig {
66-
pub(crate) prepared: PreparedStatement,
67-
pub(crate) values: SerializedValues,
66+
pub(crate) bound: BoundStatement<'static>,
6867
pub(crate) execution_profile: Arc<ExecutionProfileInner>,
6968
pub(crate) cluster_state: Arc<ClusterState>,
7069
#[cfg(feature = "metrics")]
@@ -768,26 +767,30 @@ impl QueryPager {
768767
let (sender, receiver) = mpsc::channel::<Result<ReceivedPage, NextPageError>>(1);
769768

770769
let consistency = config
770+
.bound
771771
.prepared
772772
.config
773773
.consistency
774774
.unwrap_or(config.execution_profile.consistency);
775775
let serial_consistency = config
776+
.bound
776777
.prepared
777778
.config
778779
.serial_consistency
779780
.unwrap_or(config.execution_profile.serial_consistency);
780781

781-
let page_size = config.prepared.get_validated_page_size();
782+
let page_size = config.bound.prepared.get_validated_page_size();
782783

783784
let load_balancing_policy = Arc::clone(
784785
config
786+
.bound
785787
.prepared
786788
.get_load_balancing_policy()
787789
.unwrap_or(&config.execution_profile.load_balancing_policy),
788790
);
789791

790792
let retry_session = config
793+
.bound
791794
.prepared
792795
.get_retry_policy()
793796
.map(|rp| &**rp)
@@ -796,14 +799,7 @@ impl QueryPager {
796799

797800
let parent_span = tracing::Span::current();
798801
let worker_task = async move {
799-
let prepared_ref = &config.prepared;
800-
let values_ref = &config.values;
801-
802-
let (partition_key, token) = match prepared_ref
803-
.extract_partition_key_and_calculate_token(
804-
prepared_ref.get_partitioner_name(),
805-
values_ref,
806-
) {
802+
let (partition_key, token) = match config.bound.pk_and_token() {
807803
Ok(res) => res.unzip(),
808804
Err(err) => {
809805
let (proof, _res) = ProvingSender::from(sender)
@@ -813,22 +809,22 @@ impl QueryPager {
813809
}
814810
};
815811

816-
let table_spec = config.prepared.get_table_spec();
812+
let table_spec = config.bound.prepared.get_table_spec();
817813
let statement_info = RoutingInfo {
818814
consistency,
819815
serial_consistency,
820816
token,
821817
table: table_spec,
822-
is_confirmed_lwt: config.prepared.is_confirmed_lwt(),
818+
is_confirmed_lwt: config.bound.prepared.is_confirmed_lwt(),
823819
};
824820

821+
let statement = &config.bound;
825822
let page_query = |connection: Arc<Connection>,
826823
consistency: Consistency,
827824
paging_state: PagingState| async move {
828825
connection
829826
.execute_raw_with_consistency(
830-
prepared_ref,
831-
values_ref,
827+
statement,
832828
consistency,
833829
serial_consistency,
834830
Some(page_size),
@@ -837,7 +833,7 @@ impl QueryPager {
837833
.await
838834
};
839835

840-
let serialized_values_size = config.values.buffer_size();
836+
let serialized_values_size = config.bound.values.buffer_size();
841837

842838
let replicas: Option<smallvec::SmallVec<[_; 8]>> =
843839
if let (Some(table_spec), Some(token)) =
@@ -870,14 +866,14 @@ impl QueryPager {
870866
sender: sender.into(),
871867
page_query,
872868
statement_info,
873-
query_is_idempotent: config.prepared.config.is_idempotent,
869+
query_is_idempotent: config.bound.prepared.config.is_idempotent,
874870
query_consistency: consistency,
875871
load_balancing_policy,
876872
retry_session,
877873
#[cfg(feature = "metrics")]
878874
metrics: config.metrics,
879875
paging_state: PagingState::start(),
880-
history_listener: config.prepared.config.history_listener.clone(),
876+
history_listener: config.bound.prepared.config.history_listener.clone(),
881877
current_request_id: None,
882878
current_attempt_id: None,
883879
parent_span,
@@ -920,23 +916,21 @@ impl QueryPager {
920916
}
921917

922918
pub(crate) async fn new_for_connection_execute_iter(
923-
prepared: PreparedStatement,
924-
values: SerializedValues,
919+
bound: BoundStatement<'static>,
925920
connection: Arc<Connection>,
926921
consistency: Consistency,
927922
serial_consistency: Option<SerialConsistency>,
928923
) -> Result<Self, NextPageError> {
929924
let (sender, receiver) = mpsc::channel::<Result<ReceivedPage, NextPageError>>(1);
930925

931-
let page_size = prepared.get_validated_page_size();
926+
let page_size = bound.prepared.get_validated_page_size();
932927

933928
let worker_task = async move {
934929
let worker = SingleConnectionPagerWorker {
935930
sender: sender.into(),
936931
fetcher: |paging_state| {
937932
connection.execute_raw_with_consistency(
938-
&prepared,
939-
&values,
933+
&bound,
940934
consistency,
941935
serial_consistency,
942936
Some(page_size),

scylla/src/client/session.rs

Lines changed: 38 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use crate::routing::partitioner::PartitionerName;
3737
use crate::routing::{Shard, ShardAwarePortRange};
3838
use crate::statement::batch::batch_values;
3939
use crate::statement::batch::{Batch, BatchStatement};
40+
use crate::statement::bound::BoundStatement;
4041
use crate::statement::prepared::{PartitionKeyError, PreparedStatement};
4142
use crate::statement::unprepared::Statement;
4243
use crate::statement::{Consistency, PageSize, StatementConfig};
@@ -46,7 +47,7 @@ use futures::future::try_join_all;
4647
use itertools::Itertools;
4748
use scylla_cql::frame::response::NonErrorResponse;
4849
use scylla_cql::serialize::batch::BatchValues;
49-
use scylla_cql::serialize::row::{SerializeRow, SerializedValues};
50+
use scylla_cql::serialize::row::SerializeRow;
5051
use std::borrow::Borrow;
5152
use std::future::Future;
5253
use std::net::{IpAddr, SocketAddr};
@@ -637,7 +638,8 @@ impl Session {
637638
prepared: &PreparedStatement,
638639
values: impl SerializeRow,
639640
) -> Result<QueryResult, ExecutionError> {
640-
self.do_execute_unpaged(prepared, values).await
641+
let bound = prepared.bind(&values)?;
642+
self.do_execute_unpaged(&bound).await
641643
}
642644

643645
/// Executes a prepared statement, restricting results to single page.
@@ -702,8 +704,8 @@ impl Session {
702704
values: impl SerializeRow,
703705
paging_state: PagingState,
704706
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
705-
self.do_execute_single_page(prepared, values, paging_state)
706-
.await
707+
let bound = prepared.bind(&values)?;
708+
self.do_execute_single_page(&bound, paging_state).await
707709
}
708710

709711
/// Execute a prepared statement with paging.\
@@ -750,7 +752,8 @@ impl Session {
750752
prepared: impl Into<PreparedStatement>,
751753
values: impl SerializeRow,
752754
) -> Result<QueryPager, PagerExecutionError> {
753-
self.do_execute_iter(prepared.into(), values).await
755+
let bound = prepared.into().into_bind(&values)?;
756+
self.do_execute_iter(bound).await
754757
}
755758

756759
/// Execute a batch statement\
@@ -1083,12 +1086,11 @@ impl Session {
10831086
.and_then(QueryResponse::into_non_error_query_response)
10841087
} else {
10851088
let prepared = connection.prepare(statement).await?;
1086-
let serialized = prepared.serialize_values(values_ref)?;
1087-
span_ref.record_request_size(serialized.buffer_size());
1089+
let bound = prepared.bind(values_ref)?;
1090+
span_ref.record_request_size(bound.values.buffer_size());
10881091
connection
10891092
.execute_raw_with_consistency(
1090-
&prepared,
1091-
&serialized,
1093+
&bound,
10921094
consistency,
10931095
serial_consistency,
10941096
page_size,
@@ -1181,11 +1183,9 @@ impl Session {
11811183
// Making QueryPager::new_for_query work with values is too hard (if even possible)
11821184
// so instead of sending one prepare to a specific connection on each iterator query,
11831185
// we fully prepare a statement beforehand.
1184-
let prepared = self.prepare(statement).await?;
1185-
let values = prepared.serialize_values(&values)?;
1186+
let bound = self.prepare(statement).await?.into_bind(&values)?;
11861187
QueryPager::new_for_prepared_statement(PreparedPagerConfig {
1187-
prepared,
1188-
values,
1188+
bound,
11891189
execution_profile,
11901190
cluster_state: self.cluster.get_state(),
11911191
#[cfg(feature = "metrics")]
@@ -1293,13 +1293,9 @@ impl Session {
12931293

12941294
async fn do_execute_unpaged(
12951295
&self,
1296-
prepared: &PreparedStatement,
1297-
values: impl SerializeRow,
1296+
bound: &BoundStatement<'_>,
12981297
) -> Result<QueryResult, ExecutionError> {
1299-
let serialized_values = prepared.serialize_values(&values)?;
1300-
let (result, paging_state) = self
1301-
.execute(prepared, &serialized_values, None, PagingState::start())
1302-
.await?;
1298+
let (result, paging_state) = self.execute(bound, None, PagingState::start()).await?;
13031299
if !paging_state.finished() {
13041300
error!("Unpaged prepared query returned a non-empty paging state! This is a driver-side or server-side bug.");
13051301
return Err(ExecutionError::LastAttemptError(
@@ -1311,14 +1307,11 @@ impl Session {
13111307

13121308
async fn do_execute_single_page(
13131309
&self,
1314-
prepared: &PreparedStatement,
1315-
values: impl SerializeRow,
1310+
bound: &BoundStatement<'_>,
13161311
paging_state: PagingState,
13171312
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1318-
let serialized_values = prepared.serialize_values(&values)?;
1319-
let page_size = prepared.get_validated_page_size();
1320-
self.execute(prepared, &serialized_values, Some(page_size), paging_state)
1321-
.await
1313+
let page_size = bound.prepared.get_validated_page_size();
1314+
self.execute(bound, Some(page_size), paging_state).await
13221315
}
13231316

13241317
/// Sends a prepared request to the database, optionally continuing from a saved point.
@@ -1333,44 +1326,45 @@ impl Session {
13331326
/// should be made.
13341327
async fn execute(
13351328
&self,
1336-
prepared: &PreparedStatement,
1337-
serialized_values: &SerializedValues,
1329+
bound: &BoundStatement<'_>,
13381330
page_size: Option<PageSize>,
13391331
paging_state: PagingState,
13401332
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1341-
let values_ref = &serialized_values;
13421333
let paging_state_ref = &paging_state;
13431334

1344-
let (partition_key, token) = prepared
1345-
.extract_partition_key_and_calculate_token(prepared.get_partitioner_name(), values_ref)
1335+
let (partition_key, token) = bound
1336+
.pk_and_token()
13461337
.map_err(PartitionKeyError::into_execution_error)?
13471338
.unzip();
13481339

1349-
let execution_profile = prepared
1340+
let execution_profile = bound
1341+
.prepared
13501342
.get_execution_profile_handle()
13511343
.unwrap_or_else(|| self.get_default_execution_profile_handle())
13521344
.access();
13531345

1354-
let table_spec = prepared.get_table_spec();
1346+
let table_spec = bound.prepared.get_table_spec();
13551347

13561348
let statement_info = RoutingInfo {
1357-
consistency: prepared
1349+
consistency: bound
1350+
.prepared
13581351
.config
13591352
.consistency
13601353
.unwrap_or(execution_profile.consistency),
1361-
serial_consistency: prepared
1354+
serial_consistency: bound
1355+
.prepared
13621356
.config
13631357
.serial_consistency
13641358
.unwrap_or(execution_profile.serial_consistency),
13651359
token,
13661360
table: table_spec,
1367-
is_confirmed_lwt: prepared.is_confirmed_lwt(),
1361+
is_confirmed_lwt: bound.prepared.is_confirmed_lwt(),
13681362
};
13691363

13701364
let span = RequestSpan::new_prepared(
13711365
partition_key.as_ref().map(|pk| pk.iter()),
13721366
token,
1373-
serialized_values.buffer_size(),
1367+
bound.values.buffer_size(),
13741368
);
13751369

13761370
if !span.span().is_disabled() {
@@ -1384,20 +1378,20 @@ impl Session {
13841378
let run_request_result: RunRequestResult<NonErrorQueryResponse> = self
13851379
.run_request(
13861380
statement_info,
1387-
&prepared.config,
1381+
&bound.prepared.config,
13881382
execution_profile,
13891383
|connection: Arc<Connection>,
13901384
consistency: Consistency,
13911385
execution_profile: &ExecutionProfileInner| {
1392-
let serial_consistency = prepared
1386+
let serial_consistency = bound
1387+
.prepared
13931388
.config
13941389
.serial_consistency
13951390
.unwrap_or(execution_profile.serial_consistency);
13961391
async move {
13971392
connection
13981393
.execute_raw_with_consistency(
1399-
prepared,
1400-
values_ref,
1394+
bound,
14011395
consistency,
14021396
serial_consistency,
14031397
page_size,
@@ -1432,19 +1426,16 @@ impl Session {
14321426

14331427
async fn do_execute_iter(
14341428
&self,
1435-
prepared: PreparedStatement,
1436-
values: impl SerializeRow,
1429+
bound: BoundStatement<'static>,
14371430
) -> Result<QueryPager, PagerExecutionError> {
1438-
let serialized_values = prepared.serialize_values(&values)?;
1439-
1440-
let execution_profile = prepared
1431+
let execution_profile = bound
1432+
.prepared
14411433
.get_execution_profile_handle()
14421434
.unwrap_or_else(|| self.get_default_execution_profile_handle())
14431435
.access();
14441436

14451437
QueryPager::new_for_prepared_statement(PreparedPagerConfig {
1446-
prepared,
1447-
values: serialized_values,
1438+
bound,
14481439
execution_profile,
14491440
cluster_state: self.cluster.get_state(),
14501441
#[cfg(feature = "metrics")]

0 commit comments

Comments
 (0)