Skip to content

Commit 228ec97

Browse files
committed
Extended RetryDecision with CL for retries
From now on, RetryPolicies can decide not only about whether the retry should be attempted, but also about the consistency level of such an attempt.
1 parent c449270 commit 228ec97

File tree

4 files changed

+186
-61
lines changed

4 files changed

+186
-61
lines changed

scylla/src/transport/connection.rs

Lines changed: 94 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -378,23 +378,55 @@ impl Connection {
378378
values: impl ValueList,
379379
) -> Result<QueryResult, QueryError> {
380380
let query: Query = query.into();
381-
self.query(&query, &values, None).await?.into_query_result()
381+
let consistency = query
382+
.config
383+
.determine_consistency(self.config.default_consistency);
384+
self.query_single_page_with_consistency(query, &values, consistency)
385+
.await
386+
}
387+
388+
pub async fn query_single_page_with_consistency(
389+
&self,
390+
query: impl Into<Query>,
391+
values: impl ValueList,
392+
consistency: Consistency,
393+
) -> Result<QueryResult, QueryError> {
394+
let query: Query = query.into();
395+
self.query_with_consistency(&query, &values, consistency, None)
396+
.await?
397+
.into_query_result()
382398
}
383399

384400
pub async fn query(
385401
&self,
386402
query: &Query,
387403
values: impl ValueList,
388404
paging_state: Option<Bytes>,
405+
) -> Result<QueryResponse, QueryError> {
406+
self.query_with_consistency(
407+
query,
408+
values,
409+
query
410+
.config
411+
.determine_consistency(self.config.default_consistency),
412+
paging_state,
413+
)
414+
.await
415+
}
416+
417+
pub async fn query_with_consistency(
418+
&self,
419+
query: &Query,
420+
values: impl ValueList,
421+
consistency: Consistency,
422+
paging_state: Option<Bytes>,
389423
) -> Result<QueryResponse, QueryError> {
390424
let serialized_values = values.serialized()?;
391425

392426
let query_frame = query::Query {
393427
contents: &query.contents,
394428
parameters: query::QueryParameters {
395-
consistency: query
396-
.config
397-
.determine_consistency(self.config.default_consistency),
429+
consistency,
398430
serial_consistency: query.get_serial_consistency(),
399431
values: &serialized_values,
400432
page_size: query.get_page_size(),
@@ -412,6 +444,22 @@ impl Connection {
412444
&self,
413445
query: &Query,
414446
values: impl ValueList,
447+
) -> Result<QueryResult, QueryError> {
448+
self.query_all_with_consistency(
449+
query,
450+
values,
451+
query
452+
.config
453+
.determine_consistency(self.config.default_consistency),
454+
)
455+
.await
456+
}
457+
458+
pub async fn query_all_with_consistency(
459+
&self,
460+
query: &Query,
461+
values: impl ValueList,
462+
consistency: Consistency,
415463
) -> Result<QueryResult, QueryError> {
416464
if query.get_page_size().is_none() {
417465
// Page size should be set when someone wants to use paging
@@ -426,10 +474,14 @@ impl Connection {
426474
let serialized_values = values.serialized()?;
427475
let mut paging_state: Option<Bytes> = None;
428476

477+
query
478+
.config
479+
.determine_consistency(self.config.default_consistency);
480+
429481
loop {
430482
// Send next paged query
431483
let mut cur_result: QueryResult = self
432-
.query(query, &serialized_values, paging_state)
484+
.query_with_consistency(query, &serialized_values, consistency, paging_state)
433485
.await?
434486
.into_query_result()?;
435487

@@ -462,15 +514,31 @@ impl Connection {
462514
prepared_statement: &PreparedStatement,
463515
values: impl ValueList,
464516
paging_state: Option<Bytes>,
517+
) -> Result<QueryResponse, QueryError> {
518+
self.execute_with_consistency(
519+
prepared_statement,
520+
values,
521+
prepared_statement
522+
.config
523+
.determine_consistency(self.config.default_consistency),
524+
paging_state,
525+
)
526+
.await
527+
}
528+
529+
pub async fn execute_with_consistency(
530+
&self,
531+
prepared_statement: &PreparedStatement,
532+
values: impl ValueList,
533+
consistency: Consistency,
534+
paging_state: Option<Bytes>,
465535
) -> Result<QueryResponse, QueryError> {
466536
let serialized_values = values.serialized()?;
467537

468538
let execute_frame = execute::Execute {
469539
id: prepared_statement.get_id().to_owned(),
470540
parameters: query::QueryParameters {
471-
consistency: prepared_statement
472-
.config
473-
.determine_consistency(self.config.default_consistency),
541+
consistency,
474542
serial_consistency: prepared_statement.get_serial_consistency(),
475543
values: &serialized_values,
476544
page_size: prepared_statement.get_page_size(),
@@ -544,10 +612,27 @@ impl Connection {
544612
}
545613
}
546614

615+
#[allow(dead_code)]
547616
pub async fn batch(
548617
&self,
549618
batch: &Batch,
550619
values: impl BatchValues,
620+
) -> Result<BatchResult, QueryError> {
621+
self.batch_with_consistency(
622+
batch,
623+
values,
624+
batch
625+
.config
626+
.determine_consistency(self.config.default_consistency),
627+
)
628+
.await
629+
}
630+
631+
pub async fn batch_with_consistency(
632+
&self,
633+
batch: &Batch,
634+
values: impl BatchValues,
635+
consistency: Consistency,
551636
) -> Result<BatchResult, QueryError> {
552637
let statements_count = batch.statements.len();
553638
if statements_count != values.len() {
@@ -569,9 +654,7 @@ impl Connection {
569654
statements_count,
570655
values,
571656
batch_type: batch.get_type(),
572-
consistency: batch
573-
.config
574-
.determine_consistency(self.config.default_consistency),
657+
consistency,
575658
serial_consistency: batch.get_serial_consistency(),
576659
timestamp: batch.get_timestamp(),
577660
};

scylla/src/transport/iterator.rs

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,12 @@ impl RowIterator {
145145

146146
let choose_connection = |node: Arc<Node>| async move { node.random_connection().await };
147147

148-
let page_query = |connection: Arc<Connection>, paging_state: Option<Bytes>| async move {
149-
connection.query(query_ref, values_ref, paging_state).await
148+
let page_query = |connection: Arc<Connection>,
149+
consistency: Consistency,
150+
paging_state: Option<Bytes>| async move {
151+
connection
152+
.query_with_consistency(query_ref, values_ref, consistency, paging_state)
153+
.await
150154
};
151155

152156
let worker = RowIteratorWorker {
@@ -210,9 +214,11 @@ impl RowIterator {
210214
}
211215
};
212216

213-
let page_query = |connection: Arc<Connection>, paging_state: Option<Bytes>| async move {
217+
let page_query = |connection: Arc<Connection>,
218+
consistency: Consistency,
219+
paging_state: Option<Bytes>| async move {
214220
connection
215-
.execute(prepared_ref, values_ref, paging_state)
221+
.execute_with_consistency(prepared_ref, values_ref, consistency, paging_state)
216222
.await
217223
};
218224

@@ -291,14 +297,15 @@ impl<ConnFunc, ConnFut, QueryFunc, QueryFut> RowIteratorWorker<'_, ConnFunc, Que
291297
where
292298
ConnFunc: Fn(Arc<Node>) -> ConnFut,
293299
ConnFut: Future<Output = Result<Arc<Connection>, QueryError>>,
294-
QueryFunc: Fn(Arc<Connection>, Option<Bytes>) -> QueryFut,
300+
QueryFunc: Fn(Arc<Connection>, Consistency, Option<Bytes>) -> QueryFut,
295301
QueryFut: Future<Output = Result<QueryResponse, QueryError>>,
296302
{
297303
async fn work(mut self, cluster_data: Arc<ClusterData>) {
298304
let query_plan = self.load_balancer.plan(&self.statement_info, &cluster_data);
299305

300306
let mut last_error: QueryError =
301307
QueryError::ProtocolError("Empty query plan - driver bug!");
308+
let mut current_consistency: Consistency = self.query_consistency;
302309

303310
'nodes_in_plan: for node in query_plan {
304311
let span = trace_span!("Executing query", node = node.address.to_string().as_str());
@@ -324,8 +331,10 @@ where
324331
'same_node_retries: loop {
325332
trace!(parent: &span, "Execution started");
326333
// Query pages until an error occurs
327-
let queries_result: Result<(), QueryError> =
328-
self.query_pages(&connection).instrument(span.clone()).await;
334+
let queries_result: Result<(), QueryError> = self
335+
.query_pages(&connection, current_consistency)
336+
.instrument(span.clone())
337+
.await;
329338

330339
last_error = match queries_result {
331340
Ok(()) => {
@@ -355,12 +364,14 @@ where
355364
retry_decision = format!("{:?}", retry_decision).as_str()
356365
);
357366
match retry_decision {
358-
RetryDecision::RetrySameNode => {
367+
RetryDecision::RetrySameNode(cl) => {
359368
self.metrics.inc_retries_num();
369+
current_consistency = cl;
360370
continue 'same_node_retries;
361371
}
362-
RetryDecision::RetryNextNode => {
372+
RetryDecision::RetryNextNode(cl) => {
363373
self.metrics.inc_retries_num();
374+
current_consistency = cl;
364375
continue 'nodes_in_plan;
365376
}
366377
RetryDecision::DontRetry => break 'nodes_in_plan,
@@ -373,7 +384,11 @@ where
373384
}
374385

375386
// Given a working connection query as many pages as possible until the first error
376-
async fn query_pages(&mut self, connection: &Arc<Connection>) -> Result<(), QueryError> {
387+
async fn query_pages(
388+
&mut self,
389+
connection: &Arc<Connection>,
390+
consistency: Consistency,
391+
) -> Result<(), QueryError> {
377392
loop {
378393
self.metrics.inc_total_paged_queries();
379394
let query_start = std::time::Instant::now();
@@ -383,7 +398,8 @@ where
383398
"Sending"
384399
);
385400
let query_response: QueryResponse =
386-
(self.page_query)(connection.clone(), self.paging_state.clone()).await?;
401+
(self.page_query)(connection.clone(), consistency, self.paging_state.clone())
402+
.await?;
387403

388404
match query_response.response {
389405
Response::Result(result::Result::Rows(mut rows)) => {

0 commit comments

Comments
 (0)