Skip to content

Commit 66bd241

Browse files
authored
Merge pull request #707 from piodul/readd-set-retry-policy
statement/transport: re-add per-statement setters for retry policy
2 parents f2d7e97 + 4049811 commit 66bd241

File tree

8 files changed

+87
-11
lines changed

8 files changed

+87
-11
lines changed

docs/source/retry-policy/default.md

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,21 @@ To use in a [simple query](../queries/simple.md):
3333
# extern crate scylla;
3434
# use scylla::Session;
3535
# use std::error::Error;
36+
# use std::sync::Arc;
3637
# async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
3738
use scylla::query::Query;
3839
use scylla::transport::ExecutionProfile;
3940
use scylla::transport::retry_policy::DefaultRetryPolicy;
4041

42+
// Create a Query manually and set the retry policy
43+
let mut my_query: Query = Query::new("INSERT INTO ks.tab (a) VALUES(?)");
44+
my_query.set_retry_policy(Some(Arc::new(DefaultRetryPolicy::new())));
45+
46+
// You can also set retry policy in an execution profile
4147
let handle = ExecutionProfile::builder()
4248
.retry_policy(Box::new(DefaultRetryPolicy::new()))
4349
.build()
4450
.into_handle();
45-
46-
// Create a Query manually and set the retry policy
47-
let mut my_query: Query = Query::new("INSERT INTO ks.tab (a) VALUES(?)");
4851
my_query.set_execution_profile_handle(Some(handle));
4952

5053
// Run the query using this retry policy
@@ -59,21 +62,23 @@ To use in a [prepared query](../queries/prepared.md):
5962
# extern crate scylla;
6063
# use scylla::Session;
6164
# use std::error::Error;
65+
# use std::sync::Arc;
6266
# async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
6367
use scylla::prepared_statement::PreparedStatement;
6468
use scylla::transport::ExecutionProfile;
6569
use scylla::transport::retry_policy::DefaultRetryPolicy;
6670

67-
let handle = ExecutionProfile::builder()
68-
.retry_policy(Box::new(DefaultRetryPolicy::new()))
69-
.build()
70-
.into_handle();
71-
7271
// Create PreparedStatement manually and set the retry policy
7372
let mut prepared: PreparedStatement = session
7473
.prepare("INSERT INTO ks.tab (a) VALUES(?)")
7574
.await?;
75+
prepared.set_retry_policy(Some(Arc::new(DefaultRetryPolicy::new())));
7676

77+
// You can also set retry policy in an execution profile
78+
let handle = ExecutionProfile::builder()
79+
.retry_policy(Box::new(DefaultRetryPolicy::new()))
80+
.build()
81+
.into_handle();
7782
prepared.set_execution_profile_handle(Some(handle));
7883

7984
// Run the query using this retry policy

scylla/src/statement/batch.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::sync::Arc;
22

33
use crate::history::HistoryListener;
4+
use crate::retry_policy::RetryPolicy;
45
use crate::statement::{prepared_statement::PreparedStatement, query::Query};
56
use crate::transport::execution_profile::ExecutionProfileHandle;
67

@@ -15,6 +16,9 @@ pub use crate::frame::request::batch::BatchType;
1516
pub struct Batch {
1617
pub(crate) config: StatementConfig,
1718

19+
// TODO: Move this after #701 is fixed
20+
retry_policy: Option<Arc<dyn RetryPolicy>>,
21+
1822
pub statements: Vec<BatchStatement>,
1923
batch_type: BatchType,
2024
}
@@ -108,6 +112,18 @@ impl Batch {
108112
self.config.timestamp
109113
}
110114

115+
/// Set the retry policy for this batch, overriding the one from execution profile if not None.
116+
#[inline]
117+
pub fn set_retry_policy(&mut self, retry_policy: Option<Arc<dyn RetryPolicy>>) {
118+
self.retry_policy = retry_policy;
119+
}
120+
121+
/// Get the retry policy set for the batch.
122+
#[inline]
123+
pub fn get_retry_policy(&self) -> Option<&Arc<dyn RetryPolicy>> {
124+
self.retry_policy.as_ref()
125+
}
126+
111127
/// Sets the listener capable of listening what happens during query execution.
112128
pub fn set_history_listener(&mut self, history_listener: Arc<dyn HistoryListener>) {
113129
self.config.history_listener = Some(history_listener);
@@ -134,6 +150,7 @@ impl Default for Batch {
134150
fn default() -> Self {
135151
Self {
136152
statements: Vec::new(),
153+
retry_policy: None,
137154
batch_type: BatchType::Logged,
138155
config: Default::default(),
139156
}

scylla/src/statement/prepared_statement.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use crate::frame::response::result::PreparedMetadata;
1111
use crate::frame::types::{Consistency, SerialConsistency};
1212
use crate::frame::value::SerializedValues;
1313
use crate::history::HistoryListener;
14+
use crate::retry_policy::RetryPolicy;
1415
use crate::transport::execution_profile::ExecutionProfileHandle;
1516
use crate::transport::partitioner::PartitionerName;
1617

@@ -20,6 +21,9 @@ pub struct PreparedStatement {
2021
pub(crate) config: StatementConfig,
2122
pub prepare_tracing_ids: Vec<Uuid>,
2223

24+
// TODO: Move this after #701 is fixed
25+
retry_policy: Option<Arc<dyn RetryPolicy>>,
26+
2327
id: Bytes,
2428
shared: Arc<PreparedStatementSharedData>,
2529
page_size: Option<i32>,
@@ -37,6 +41,7 @@ impl Clone for PreparedStatement {
3741
fn clone(&self) -> Self {
3842
Self {
3943
config: self.config.clone(),
44+
retry_policy: self.retry_policy.clone(),
4045
prepare_tracing_ids: Vec::new(),
4146
id: self.id.clone(),
4247
shared: self.shared.clone(),
@@ -53,6 +58,7 @@ impl PreparedStatement {
5358
is_lwt: bool,
5459
metadata: PreparedMetadata,
5560
statement: String,
61+
retry_policy: Option<Arc<dyn RetryPolicy>>,
5662
page_size: Option<i32>,
5763
config: StatementConfig,
5864
) -> Self {
@@ -62,6 +68,7 @@ impl PreparedStatement {
6268
metadata,
6369
statement,
6470
}),
71+
retry_policy,
6572
prepare_tracing_ids: Vec::new(),
6673
page_size,
6774
config,
@@ -289,6 +296,18 @@ impl PreparedStatement {
289296
&self.partitioner_name
290297
}
291298

299+
/// Set the retry policy for this statement, overriding the one from execution profile if not None.
300+
#[inline]
301+
pub fn set_retry_policy(&mut self, retry_policy: Option<Arc<dyn RetryPolicy>>) {
302+
self.retry_policy = retry_policy;
303+
}
304+
305+
/// Get the retry policy set for the statement.
306+
#[inline]
307+
pub fn get_retry_policy(&self) -> Option<&Arc<dyn RetryPolicy>> {
308+
self.retry_policy.as_ref()
309+
}
310+
292311
/// Sets the listener capable of listening what happens during query execution.
293312
pub fn set_history_listener(&mut self, history_listener: Arc<dyn HistoryListener>) {
294313
self.config.history_listener = Some(history_listener);

scylla/src/statement/query.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use super::StatementConfig;
22
use crate::frame::types::{Consistency, SerialConsistency};
33
use crate::history::HistoryListener;
4+
use crate::retry_policy::RetryPolicy;
45
use crate::transport::execution_profile::ExecutionProfileHandle;
56
use std::sync::Arc;
67
use std::time::Duration;
@@ -12,6 +13,9 @@ use std::time::Duration;
1213
pub struct Query {
1314
pub(crate) config: StatementConfig,
1415

16+
// TODO: Move this after #701 is fixed
17+
retry_policy: Option<Arc<dyn RetryPolicy>>,
18+
1519
pub contents: String,
1620
page_size: Option<i32>,
1721
}
@@ -21,6 +25,7 @@ impl Query {
2125
pub fn new(query_text: impl Into<String>) -> Self {
2226
Self {
2327
contents: query_text.into(),
28+
retry_policy: None,
2429
page_size: None,
2530
config: Default::default(),
2631
}
@@ -123,6 +128,18 @@ impl Query {
123128
self.config.request_timeout
124129
}
125130

131+
/// Set the retry policy for this statement, overriding the one from execution profile if not None.
132+
#[inline]
133+
pub fn set_retry_policy(&mut self, retry_policy: Option<Arc<dyn RetryPolicy>>) {
134+
self.retry_policy = retry_policy;
135+
}
136+
137+
/// Get the retry policy set for the statement.
138+
#[inline]
139+
pub fn get_retry_policy(&self) -> Option<&Arc<dyn RetryPolicy>> {
140+
self.retry_policy.as_ref()
141+
}
142+
126143
/// Sets the listener capable of listening what happens during query execution.
127144
pub fn set_history_listener(&mut self, history_listener: Arc<dyn HistoryListener>) {
128145
self.config.history_listener = Some(history_listener);

scylla/src/transport/caching_session.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,11 +166,13 @@ where
166166

167167
if let Some(raw) = self.cache.get(&query.contents) {
168168
let page_size = query.get_page_size();
169+
let retry_policy = query.get_retry_policy().cloned();
169170
let mut stmt = PreparedStatement::new(
170171
raw.id.clone(),
171172
raw.is_confirmed_lwt,
172173
raw.metadata.clone(),
173174
query.contents,
175+
retry_policy,
174176
page_size,
175177
query.config,
176178
);

scylla/src/transport/connection.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,7 @@ impl Connection {
423423
.prepared_flags_contain_lwt_mark(p.prepared_metadata.flags as u32),
424424
p.prepared_metadata,
425425
query.contents.clone(),
426+
query.get_retry_policy().cloned(),
426427
query.get_page_size(),
427428
query.config.clone(),
428429
),

scylla/src/transport/iterator.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,11 @@ impl RowIterator {
151151
.serial_consistency
152152
.unwrap_or(execution_profile.serial_consistency);
153153

154-
let retry_session = execution_profile.retry_policy.new_session();
154+
let retry_session = query
155+
.get_retry_policy()
156+
.map(|rp| &**rp)
157+
.unwrap_or(&*execution_profile.retry_policy)
158+
.new_session();
155159

156160
let parent_span = tracing::Span::current();
157161
let worker_task = async move {
@@ -222,7 +226,12 @@ impl RowIterator {
222226
.config
223227
.serial_consistency
224228
.unwrap_or(config.execution_profile.serial_consistency);
225-
let retry_session = config.execution_profile.retry_policy.new_session();
229+
let retry_session = config
230+
.prepared
231+
.get_retry_policy()
232+
.map(|rp| &**rp)
233+
.unwrap_or(&*config.execution_profile.retry_policy)
234+
.new_session();
226235

227236
let parent_span = tracing::Span::current();
228237
let worker_task = async move {

scylla/src/transport/session.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::cloud::CloudConfig;
77
use crate::frame::types::LegacyConsistency;
88
use crate::history;
99
use crate::history::HistoryListener;
10+
use crate::retry_policy::RetryPolicy;
1011
use arc_swap::ArcSwapOption;
1112
use async_trait::async_trait;
1213
use bytes::Bytes;
@@ -581,6 +582,7 @@ impl Session {
581582
.run_query(
582583
RoutingInfo::default(),
583584
&query.config,
585+
query.get_retry_policy().map(|rp| &**rp),
584586
|node: Arc<Node>| async move { node.random_connection().await },
585587
|connection: Arc<Connection>,
586588
consistency: Consistency,
@@ -920,6 +922,7 @@ impl Session {
920922
.run_query(
921923
statement_info,
922924
&prepared.config,
925+
prepared.get_retry_policy().map(|rp| &**rp),
923926
|node: Arc<Node>| async move {
924927
match token {
925928
Some(token) => node.connection_for_token(token).await,
@@ -1117,6 +1120,7 @@ impl Session {
11171120
.run_query(
11181121
statement_info,
11191122
&batch.config,
1123+
batch.get_retry_policy().map(|rp| &**rp),
11201124
|node: Arc<Node>| async move {
11211125
match first_value_token {
11221126
Some(first_value_token) => {
@@ -1430,6 +1434,7 @@ impl Session {
14301434
&'a self,
14311435
statement_info: RoutingInfo<'a>,
14321436
statement_config: &'a StatementConfig,
1437+
statement_retry_policy: Option<&dyn RetryPolicy>,
14331438
choose_connection: impl Fn(Arc<Node>) -> ConnFut,
14341439
do_query: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
14351440
request_span: &'a RequestSpan,
@@ -1479,7 +1484,7 @@ impl Session {
14791484
}
14801485
}
14811486

1482-
let retry_policy = &execution_profile.retry_policy;
1487+
let retry_policy = statement_retry_policy.unwrap_or(&*execution_profile.retry_policy);
14831488

14841489
let speculative_policy = execution_profile.speculative_execution_policy.as_ref();
14851490

@@ -1754,6 +1759,7 @@ impl Session {
17541759
.run_query(
17551760
info,
17561761
&config,
1762+
None, // No specific retry policy needed for schema agreement
17571763
|node: Arc<Node>| async move { node.random_connection().await },
17581764
do_query,
17591765
&span,

0 commit comments

Comments
 (0)