Skip to content

Commit 739bce7

Browse files
committed
introduce Execute trait for statements that don't need values
For now it is a sealed trait implemented only for BoundStatement and BoundBatch
1 parent 0cc0531 commit 739bce7

File tree

3 files changed

+128
-55
lines changed

3 files changed

+128
-55
lines changed

scylla/src/client/session.rs

Lines changed: 19 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use crate::routing::{Shard, ShardAwarePortRange};
3838
use crate::statement::batch::BoundBatch;
3939
use crate::statement::batch::{Batch, BatchStatement};
4040
use crate::statement::bound::BoundStatement;
41+
use crate::statement::execute::{Execute, ExecutePageable};
4142
use crate::statement::prepared::{PartitionKeyError, PreparedStatement};
4243
use crate::statement::unprepared::Statement;
4344
use crate::statement::{Consistency, PageSize, StatementConfig};
@@ -58,7 +59,7 @@ use std::time::Duration;
5859
use tokio::time::timeout;
5960
#[cfg(feature = "unstable-cloud")]
6061
use tracing::warn;
61-
use tracing::{debug, error, trace, trace_span, Instrument};
62+
use tracing::{debug, trace, trace_span, Instrument};
6263
use uuid::Uuid;
6364

6465
pub(crate) const TABLET_CHANNEL_SIZE: usize = 8192;
@@ -483,7 +484,8 @@ impl Session {
483484
statement: impl Into<Statement>,
484485
values: impl SerializeRow,
485486
) -> Result<QueryResult, ExecutionError> {
486-
self.do_query_unpaged(&statement.into(), values).await
487+
let statement = statement.into();
488+
(&statement, values).execute(self).await
487489
}
488490

489491
/// Queries a single page from the database, optionally continuing from a saved point.
@@ -543,7 +545,9 @@ impl Session {
543545
values: impl SerializeRow,
544546
paging_state: PagingState,
545547
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
546-
self.do_query_single_page(&statement.into(), values, paging_state)
548+
let statement = statement.into();
549+
(&statement, values)
550+
.execute_pageable::<true>(self, paging_state)
547551
.await
548552
}
549553

@@ -808,9 +812,9 @@ impl Session {
808812
values: impl BatchValues,
809813
) -> Result<QueryResult, ExecutionError> {
810814
let batch = self.last_minute_prepare_batch(batch, &values).await?;
811-
let batch = BoundBatch::from_batch(batch.as_ref(), values)?;
812-
813-
self.do_batch(&batch).await
815+
BoundBatch::from_batch(batch.as_ref(), values)?
816+
.execute(self)
817+
.await
814818
}
815819
}
816820

@@ -991,38 +995,6 @@ impl Session {
991995
Ok(session)
992996
}
993997

994-
async fn do_query_unpaged(
995-
&self,
996-
statement: &Statement,
997-
values: impl SerializeRow,
998-
) -> Result<QueryResult, ExecutionError> {
999-
let (result, paging_state_response) = self
1000-
.query(statement, values, None, PagingState::start())
1001-
.await?;
1002-
if !paging_state_response.finished() {
1003-
error!("Unpaged unprepared query returned a non-empty paging state! This is a driver-side or server-side bug.");
1004-
return Err(ExecutionError::LastAttemptError(
1005-
RequestAttemptError::NonfinishedPagingState,
1006-
));
1007-
}
1008-
Ok(result)
1009-
}
1010-
1011-
async fn do_query_single_page(
1012-
&self,
1013-
statement: &Statement,
1014-
values: impl SerializeRow,
1015-
paging_state: PagingState,
1016-
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1017-
self.query(
1018-
statement,
1019-
values,
1020-
Some(statement.get_validated_page_size()),
1021-
paging_state,
1022-
)
1023-
.await
1024-
}
1025-
1026998
/// Sends a request to the database.
1027999
/// Optionally continues fetching results from a saved point.
10281000
///
@@ -1034,7 +1006,7 @@ impl Session {
10341006
/// that we need to require users to make a conscious decision to use paging or not. For that, we expose
10351007
/// the aforementioned 3 methods clearly differing in naming and API, so that no unconscious choices about paging
10361008
/// should be made.
1037-
async fn query(
1009+
pub(crate) async fn query(
10381010
&self,
10391011
statement: &Statement,
10401012
values: impl SerializeRow,
@@ -1299,23 +1271,15 @@ impl Session {
12991271
&self,
13001272
statement: &BoundStatement,
13011273
) -> Result<QueryResult, ExecutionError> {
1302-
let (result, paging_state) = self.execute(statement, None, PagingState::start()).await?;
1303-
if !paging_state.finished() {
1304-
error!("Unpaged prepared query returned a non-empty paging state! This is a driver-side or server-side bug.");
1305-
return Err(ExecutionError::LastAttemptError(
1306-
RequestAttemptError::NonfinishedPagingState,
1307-
));
1308-
}
1309-
Ok(result)
1274+
statement.execute(self).await
13101275
}
13111276

13121277
async fn do_execute_single_page(
13131278
&self,
13141279
statement: &BoundStatement,
13151280
paging_state: PagingState,
13161281
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1317-
let page_size = statement.prepared.get_validated_page_size();
1318-
self.execute(statement, Some(page_size), paging_state).await
1282+
statement.execute_pageable::<true>(self, paging_state).await
13191283
}
13201284

13211285
/// Sends a prepared request to the database, optionally continuing from a saved point.
@@ -1328,7 +1292,7 @@ impl Session {
13281292
/// that we need to require users to make a conscious decision to use paging or not. For that, we expose
13291293
/// the aforementioned 3 methods clearly differing in naming and API, so that no unconscious choices about paging
13301294
/// should be made.
1331-
async fn execute(
1295+
pub(crate) async fn execute_bound_statement(
13321296
&self,
13331297
statement: &BoundStatement,
13341298
page_size: Option<PageSize>,
@@ -1449,7 +1413,7 @@ impl Session {
14491413
.map_err(PagerExecutionError::NextPageError)
14501414
}
14511415

1452-
async fn do_batch(&self, batch: &BoundBatch) -> Result<QueryResult, ExecutionError> {
1416+
pub(crate) async fn do_batch(&self, batch: &BoundBatch) -> Result<QueryResult, ExecutionError> {
14531417
// Shard-awareness behavior for batch will be to pick shard based on first batch statement's shard
14541418
// If users batch statements by shard, they will be rewarded with full shard awareness
14551419
let execution_profile = batch
@@ -1750,10 +1714,10 @@ impl Session {
17501714
traces_events_query.config.consistency = consistency;
17511715
traces_events_query.set_page_size(TRACING_QUERY_PAGE_SIZE);
17521716

1753-
let (traces_session_res, traces_events_res) = tokio::try_join!(
1754-
self.do_query_unpaged(&traces_session_query, (tracing_id,)),
1755-
self.do_query_unpaged(&traces_events_query, (tracing_id,))
1756-
)?;
1717+
let session_query = (&traces_session_query, (tracing_id,));
1718+
let events_query = (&traces_events_query, (tracing_id,));
1719+
let (traces_session_res, traces_events_res) =
1720+
tokio::try_join!(session_query.execute(self), events_query.execute(self))?;
17571721

17581722
// Get tracing info
17591723
let maybe_tracing_info: Option<TracingInfo> = traces_session_res

scylla/src/statement/execute.rs

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
use scylla_cql::{
2+
frame::request::query::{PagingState, PagingStateResponse},
3+
serialize::row::SerializeRow,
4+
};
5+
use tracing::error;
6+
7+
use crate::{
8+
client::session::Session,
9+
errors::{ExecutionError, RequestAttemptError},
10+
response::query_result::QueryResult,
11+
};
12+
13+
use super::{batch::BoundBatch, bound::BoundStatement, Statement};
14+
15+
// seals the trait to foreign implementations
16+
mod private {
17+
#[allow(unnameable_types)]
18+
pub trait Sealed {}
19+
}
20+
21+
/// A type that can be executed on a [`Session`] without any additional values
22+
///
23+
/// In practice this means that the statement(s) all already had their values bound.
24+
pub trait Execute: private::Sealed {
25+
/// Executes on the session
26+
fn execute(
27+
&self,
28+
session: &Session,
29+
) -> impl std::future::Future<Output = Result<QueryResult, ExecutionError>>;
30+
}
31+
32+
/// A type that can be executed on a [`Session`] and is aware of pagination
33+
pub trait ExecutePageable {
34+
/// Executes a command with the `paging_state` determining where the results should start
35+
///
36+
/// If SINGLE_PAGE is set to true then a single page is returned. If SINGLE_PAGE is set to
37+
/// false, then all pages (starting at `paging_state`) are returned
38+
fn execute_pageable<const SINGLE_PAGE: bool>(
39+
&self,
40+
session: &Session,
41+
paging_state: PagingState,
42+
) -> impl std::future::Future<Output = Result<(QueryResult, PagingStateResponse), ExecutionError>>;
43+
}
44+
45+
impl<T: ExecutePageable + private::Sealed> Execute for T {
46+
/// Executes the pageable type but getting all pages from the start
47+
async fn execute(&self, session: &Session) -> Result<QueryResult, ExecutionError> {
48+
let (result, paging_state) = self
49+
.execute_pageable::<false>(session, PagingState::start())
50+
.await?;
51+
52+
if !paging_state.finished() {
53+
error!("Unpaged query returned a non-empty paging state! This is a driver-side or server-side bug.");
54+
return Err(ExecutionError::LastAttemptError(
55+
RequestAttemptError::NonfinishedPagingState,
56+
));
57+
}
58+
59+
Ok(result)
60+
}
61+
}
62+
63+
impl private::Sealed for BoundBatch {}
64+
65+
impl Execute for BoundBatch {
66+
async fn execute(&self, session: &Session) -> Result<QueryResult, ExecutionError> {
67+
session.do_batch(self).await
68+
}
69+
}
70+
71+
impl private::Sealed for BoundStatement {}
72+
73+
impl ExecutePageable for BoundStatement {
74+
async fn execute_pageable<const SINGLE_PAGE: bool>(
75+
&self,
76+
session: &Session,
77+
paging_state: PagingState,
78+
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
79+
let page_size = if SINGLE_PAGE {
80+
Some(self.prepared.get_validated_page_size())
81+
} else {
82+
None
83+
};
84+
session
85+
.execute_bound_statement(self, page_size, paging_state)
86+
.await
87+
}
88+
}
89+
90+
impl<V: SerializeRow> private::Sealed for (&Statement, V) {}
91+
92+
impl<V: SerializeRow> ExecutePageable for (&Statement, V) {
93+
async fn execute_pageable<const SINGLE_PAGE: bool>(
94+
&self,
95+
session: &Session,
96+
paging_state: PagingState,
97+
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
98+
let page_size = if SINGLE_PAGE {
99+
Some(self.0.get_validated_page_size())
100+
} else {
101+
None
102+
};
103+
104+
session
105+
.query(self.0, &self.1, page_size, paging_state)
106+
.await
107+
}
108+
}

scylla/src/statement/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use crate::policies::retry::RetryPolicy;
1616

1717
pub mod batch;
1818
pub mod bound;
19+
pub mod execute;
1920
pub mod prepared;
2021
pub mod unprepared;
2122

0 commit comments

Comments
 (0)