Skip to content

Commit 57ad5ad

Browse files
authored
Merge pull request #1156 from muzarski/rename-run-do-execute-query
session: adjust naming in internal session functions
2 parents 089f26f + f6a9439 commit 57ad5ad

File tree

1 file changed

+74
-68
lines changed

1 file changed

+74
-68
lines changed

scylla/src/transport/session.rs

Lines changed: 74 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ impl Default for SessionConfig {
472472
}
473473
}
474474

475-
pub(crate) enum RunQueryResult<ResT> {
475+
pub(crate) enum RunRequestResult<ResT> {
476476
IgnoredWriteError,
477477
Completed(ResT),
478478
}
@@ -1203,8 +1203,8 @@ where
12031203

12041204
let span = RequestSpan::new_query(&query.contents);
12051205
let span_ref = &span;
1206-
let run_query_result = self
1207-
.run_query(
1206+
let run_request_result = self
1207+
.run_request(
12081208
statement_info,
12091209
&query.config,
12101210
execution_profile,
@@ -1257,13 +1257,13 @@ where
12571257
.instrument(span.span().clone())
12581258
.await?;
12591259

1260-
let response = match run_query_result {
1261-
RunQueryResult::IgnoredWriteError => NonErrorQueryResponse {
1260+
let response = match run_request_result {
1261+
RunRequestResult::IgnoredWriteError => NonErrorQueryResponse {
12621262
response: NonErrorResponse::Result(result::Result::Void),
12631263
tracing_id: None,
12641264
warnings: Vec::new(),
12651265
},
1266-
RunQueryResult::Completed(response) => response,
1266+
RunRequestResult::Completed(response) => response,
12671267
};
12681268

12691269
self.handle_set_keyspace_response(&response).await?;
@@ -1526,8 +1526,8 @@ where
15261526
}
15271527
}
15281528

1529-
let run_query_result: RunQueryResult<NonErrorQueryResponse> = self
1530-
.run_query(
1529+
let run_request_result: RunRequestResult<NonErrorQueryResponse> = self
1530+
.run_request(
15311531
statement_info,
15321532
&prepared.config,
15331533
execution_profile,
@@ -1558,13 +1558,13 @@ where
15581558
.instrument(span.span().clone())
15591559
.await?;
15601560

1561-
let response = match run_query_result {
1562-
RunQueryResult::IgnoredWriteError => NonErrorQueryResponse {
1561+
let response = match run_request_result {
1562+
RunRequestResult::IgnoredWriteError => NonErrorQueryResponse {
15631563
response: NonErrorResponse::Result(result::Result::Void),
15641564
tracing_id: None,
15651565
warnings: Vec::new(),
15661566
},
1567-
RunQueryResult::Completed(response) => response,
1567+
RunRequestResult::Completed(response) => response,
15681568
};
15691569

15701570
self.handle_set_keyspace_response(&response).await?;
@@ -1650,8 +1650,8 @@ where
16501650

16511651
let span = RequestSpan::new_batch();
16521652

1653-
let run_query_result = self
1654-
.run_query(
1653+
let run_request_result = self
1654+
.run_request(
16551655
statement_info,
16561656
&batch.config,
16571657
execution_profile,
@@ -1678,9 +1678,9 @@ where
16781678
.instrument(span.span().clone())
16791679
.await?;
16801680

1681-
let result = match run_query_result {
1682-
RunQueryResult::IgnoredWriteError => QueryResult::mock_empty(),
1683-
RunQueryResult::Completed(result) => {
1681+
let result = match run_request_result {
1682+
RunRequestResult::IgnoredWriteError => QueryResult::mock_empty(),
1683+
RunRequestResult::Completed(result) => {
16841684
span.record_result_fields(&result);
16851685
result
16861686
}
@@ -1916,26 +1916,27 @@ where
19161916
Ok(Some(tracing_info))
19171917
}
19181918

1919-
// This method allows to easily run a query using load balancing, retry policy etc.
1920-
// Requires some information about the query and a closure.
1921-
// The closure is used to do the query itself on a connection.
1922-
// - query will use connection.query()
1923-
// - execute will use connection.execute()
1924-
// If this query closure fails with some errors retry policy is used to perform retries
1925-
// On success this query's result is returned
1919+
/// This method allows to easily run a request using load balancing, retry policy etc.
1920+
/// Requires some information about the request and a closure.
1921+
/// The closure is used to execute the request once on a chosen connection.
1922+
/// - query will use connection.query()
1923+
/// - execute will use connection.execute()
1924+
///
1925+
/// If this closure fails with some errors, retry policy is used to perform retries.
1926+
/// On success, this request's result is returned.
19261927
// I tried to make this closures take a reference instead of an Arc but failed
19271928
// maybe once async closures get stabilized this can be fixed
1928-
async fn run_query<'a, QueryFut, ResT>(
1929+
async fn run_request<'a, QueryFut, ResT>(
19291930
&'a self,
19301931
statement_info: RoutingInfo<'a>,
19311932
statement_config: &'a StatementConfig,
19321933
execution_profile: Arc<ExecutionProfileInner>,
1933-
do_query: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
1934+
run_request_once: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
19341935
request_span: &'a RequestSpan,
1935-
) -> Result<RunQueryResult<ResT>, QueryError>
1936+
) -> Result<RunRequestResult<ResT>, QueryError>
19361937
where
19371938
QueryFut: Future<Output = Result<ResT, QueryError>>,
1938-
ResT: AllowedRunQueryResTType,
1939+
ResT: AllowedRunRequestResTType,
19391940
{
19401941
let history_listener_and_id: Option<(&'a dyn HistoryListener, history::QueryId)> =
19411942
statement_config
@@ -1947,11 +1948,11 @@ where
19471948

19481949
let runner = async {
19491950
let cluster_data = self.cluster.get_data();
1950-
let query_plan =
1951+
let request_plan =
19511952
load_balancing::Plan::new(load_balancer.as_ref(), &statement_info, &cluster_data);
19521953

1953-
// If a speculative execution policy is used to run query, query_plan has to be shared
1954-
// between different async functions. This struct helps to wrap query_plan in mutex so it
1954+
// If a speculative execution policy is used to run request, request_plan has to be shared
1955+
// between different async functions. This struct helps to wrap request_plan in mutex so it
19551956
// can be shared safely.
19561957
struct SharedPlan<'a, I>
19571958
where
@@ -1980,11 +1981,11 @@ where
19801981

19811982
match speculative_policy {
19821983
Some(speculative) if statement_config.is_idempotent => {
1983-
let shared_query_plan = SharedPlan {
1984-
iter: std::sync::Mutex::new(query_plan),
1984+
let shared_request_plan = SharedPlan {
1985+
iter: std::sync::Mutex::new(request_plan),
19851986
};
19861987

1987-
let execute_query_generator = |is_speculative: bool| {
1988+
let request_runner_generator = |is_speculative: bool| {
19881989
let history_data: Option<HistoryData> = history_listener_and_id
19891990
.as_ref()
19901991
.map(|(history_listener, query_id)| {
@@ -2005,11 +2006,11 @@ where
20052006
request_span.inc_speculative_executions();
20062007
}
20072008

2008-
self.execute_query(
2009-
&shared_query_plan,
2010-
&do_query,
2009+
self.run_request_speculative_fiber(
2010+
&shared_request_plan,
2011+
&run_request_once,
20112012
&execution_profile,
2012-
ExecuteQueryContext {
2013+
ExecuteRequestContext {
20132014
is_idempotent: statement_config.is_idempotent,
20142015
consistency_set_on_statement: statement_config.consistency,
20152016
retry_session: retry_policy.new_session(),
@@ -2027,7 +2028,7 @@ where
20272028
speculative_execution::execute(
20282029
speculative.as_ref(),
20292030
&context,
2030-
execute_query_generator,
2031+
request_runner_generator,
20312032
)
20322033
.await
20332034
}
@@ -2040,11 +2041,11 @@ where
20402041
query_id: *query_id,
20412042
speculative_id: None,
20422043
});
2043-
self.execute_query(
2044-
query_plan,
2045-
&do_query,
2044+
self.run_request_speculative_fiber(
2045+
request_plan,
2046+
&run_request_once,
20462047
&execution_profile,
2047-
ExecuteQueryContext {
2048+
ExecuteRequestContext {
20482049
is_idempotent: statement_config.is_idempotent,
20492050
consistency_set_on_statement: statement_config.consistency,
20502051
retry_session: retry_policy.new_session(),
@@ -2085,24 +2086,29 @@ where
20852086
result
20862087
}
20872088

2088-
async fn execute_query<'a, QueryFut, ResT>(
2089+
/// Executes the closure `run_request_once`, provided the load balancing plan and some information
2090+
/// about the request, including retry session.
2091+
/// If request fails, retry session is used to perform retries.
2092+
///
2093+
/// Returns None, if provided plan is empty.
2094+
async fn run_request_speculative_fiber<'a, QueryFut, ResT>(
20892095
&'a self,
2090-
query_plan: impl Iterator<Item = (NodeRef<'a>, Shard)>,
2091-
do_query: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
2096+
request_plan: impl Iterator<Item = (NodeRef<'a>, Shard)>,
2097+
run_request_once: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
20922098
execution_profile: &ExecutionProfileInner,
2093-
mut context: ExecuteQueryContext<'a>,
2094-
) -> Option<Result<RunQueryResult<ResT>, QueryError>>
2099+
mut context: ExecuteRequestContext<'a>,
2100+
) -> Option<Result<RunRequestResult<ResT>, QueryError>>
20952101
where
20962102
QueryFut: Future<Output = Result<ResT, QueryError>>,
2097-
ResT: AllowedRunQueryResTType,
2103+
ResT: AllowedRunRequestResTType,
20982104
{
20992105
let mut last_error: Option<QueryError> = None;
21002106
let mut current_consistency: Consistency = context
21012107
.consistency_set_on_statement
21022108
.unwrap_or(execution_profile.consistency);
21032109

2104-
'nodes_in_plan: for (node, shard) in query_plan {
2105-
let span = trace_span!("Executing query", node = %node.address);
2110+
'nodes_in_plan: for (node, shard) in request_plan {
2111+
let span = trace_span!("Executing request", node = %node.address);
21062112
'same_node_retries: loop {
21072113
trace!(parent: &span, "Execution started");
21082114
let connection = match node.connection_for_shard(shard).await {
@@ -2114,14 +2120,14 @@ where
21142120
"Choosing connection failed"
21152121
);
21162122
last_error = Some(e.into());
2117-
// Broken connection doesn't count as a failed query, don't log in metrics
2123+
// Broken connection doesn't count as a failed request, don't log in metrics
21182124
continue 'nodes_in_plan;
21192125
}
21202126
};
21212127
context.request_span.record_shard_id(&connection);
21222128

21232129
self.metrics.inc_total_nonpaged_queries();
2124-
let query_start = std::time::Instant::now();
2130+
let request_start = std::time::Instant::now();
21252131

21262132
trace!(
21272133
parent: &span,
@@ -2130,29 +2136,29 @@ where
21302136
);
21312137
let attempt_id: Option<history::AttemptId> =
21322138
context.log_attempt_start(connection.get_connect_address());
2133-
let query_result: Result<ResT, QueryError> =
2134-
do_query(connection, current_consistency, execution_profile)
2139+
let request_result: Result<ResT, QueryError> =
2140+
run_request_once(connection, current_consistency, execution_profile)
21352141
.instrument(span.clone())
21362142
.await;
21372143

2138-
let elapsed = query_start.elapsed();
2139-
last_error = match query_result {
2144+
let elapsed = request_start.elapsed();
2145+
last_error = match request_result {
21402146
Ok(response) => {
2141-
trace!(parent: &span, "Query succeeded");
2147+
trace!(parent: &span, "Request succeeded");
21422148
let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
21432149
context.log_attempt_success(&attempt_id);
21442150
execution_profile.load_balancing_policy.on_query_success(
21452151
context.query_info,
21462152
elapsed,
21472153
node,
21482154
);
2149-
return Some(Ok(RunQueryResult::Completed(response)));
2155+
return Some(Ok(RunRequestResult::Completed(response)));
21502156
}
21512157
Err(e) => {
21522158
trace!(
21532159
parent: &span,
21542160
last_error = %e,
2155-
"Query failed"
2161+
"Request failed"
21562162
);
21572163
self.metrics.inc_failed_nonpaged_queries();
21582164
execution_profile.load_balancing_policy.on_query_failure(
@@ -2195,7 +2201,7 @@ where
21952201
RetryDecision::DontRetry => break 'nodes_in_plan,
21962202

21972203
RetryDecision::IgnoreWriteError => {
2198-
return Some(Ok(RunQueryResult::IgnoredWriteError))
2204+
return Some(Ok(RunRequestResult::IgnoredWriteError))
21992205
}
22002206
};
22012207
}
@@ -2243,22 +2249,22 @@ where
22432249
}
22442250
}
22452251

2246-
// run_query, execute_query, etc have a template type called ResT.
2252+
// run_request, run_request_speculative_fiber, etc have a template type called ResT.
22472253
// There was a bug where ResT was set to QueryResponse, which could
22482254
// be an error response. This was not caught by retry policy which
22492255
// assumed all errors would come from analyzing Result<ResT, QueryError>.
22502256
// This trait is a guard to make sure that this mistake doesn't
22512257
// happen again.
2252-
// When using run_query make sure that the ResT type is NOT able
2258+
// When using run_request make sure that the ResT type is NOT able
22532259
// to contain any errors.
22542260
// See https://github.com/scylladb/scylla-rust-driver/issues/501
2255-
pub(crate) trait AllowedRunQueryResTType {}
2261+
pub(crate) trait AllowedRunRequestResTType {}
22562262

2257-
impl AllowedRunQueryResTType for Uuid {}
2258-
impl AllowedRunQueryResTType for QueryResult {}
2259-
impl AllowedRunQueryResTType for NonErrorQueryResponse {}
2263+
impl AllowedRunRequestResTType for Uuid {}
2264+
impl AllowedRunRequestResTType for QueryResult {}
2265+
impl AllowedRunRequestResTType for NonErrorQueryResponse {}
22602266

2261-
struct ExecuteQueryContext<'a> {
2267+
struct ExecuteRequestContext<'a> {
22622268
is_idempotent: bool,
22632269
consistency_set_on_statement: Option<Consistency>,
22642270
retry_session: Box<dyn RetrySession>,
@@ -2273,7 +2279,7 @@ struct HistoryData<'a> {
22732279
speculative_id: Option<history::SpeculativeId>,
22742280
}
22752281

2276-
impl ExecuteQueryContext<'_> {
2282+
impl ExecuteRequestContext<'_> {
22772283
fn log_attempt_start(&self, node_addr: SocketAddr) -> Option<history::AttemptId> {
22782284
self.history_data.as_ref().map(|hd| {
22792285
hd.listener

0 commit comments

Comments
 (0)