Skip to content

Commit 918e522

Browse files
authored
Merge pull request #1159 from muzarski/history-listener-errors
errors: narrow error types accepted by `HistoryListener`
2 parents 4a9367c + 5e6c26b commit 918e522

File tree

6 files changed

+436
-357
lines changed

6 files changed

+436
-357
lines changed

scylla/src/client/pager.rs

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use crate::cluster::{ClusterState, NodeRef};
2929
#[allow(deprecated)]
3030
use crate::cql_to_rust::{FromRow, FromRowError};
3131
use crate::deserialize::DeserializeOwnedRow;
32-
use crate::errors::ProtocolError;
32+
use crate::errors::{ProtocolError, RequestError};
3333
use crate::errors::{QueryError, RequestAttemptError};
3434
use crate::frame::response::result;
3535
use crate::network::Connection;
@@ -148,7 +148,7 @@ struct PagerWorker<'a, QueryFunc, SpanCreatorFunc> {
148148
paging_state: PagingState,
149149

150150
history_listener: Option<Arc<dyn HistoryListener>>,
151-
current_query_id: Option<history::QueryId>,
151+
current_request_id: Option<history::RequestId>,
152152
current_attempt_id: Option<history::AttemptId>,
153153

154154
parent_span: tracing::Span,
@@ -168,10 +168,10 @@ where
168168
let query_plan =
169169
load_balancing::Plan::new(load_balancer.as_ref(), &statement_info, &cluster_data);
170170

171-
let mut last_error: QueryError = QueryError::EmptyPlan;
171+
let mut last_error: RequestError = RequestError::EmptyPlan;
172172
let mut current_consistency: Consistency = self.query_consistency;
173173

174-
self.log_query_start();
174+
self.log_request_start();
175175

176176
'nodes_in_plan: for (node, shard) in query_plan {
177177
let span =
@@ -235,8 +235,9 @@ where
235235
retry_decision = ?retry_decision
236236
);
237237

238-
last_error = request_error.into_query_error();
239-
self.log_attempt_error(&last_error, &retry_decision);
238+
self.log_attempt_error(&request_error, &retry_decision);
239+
240+
last_error = request_error.into();
240241

241242
match retry_decision {
242243
RetryDecision::RetrySameNode(cl) => {
@@ -265,9 +266,8 @@ where
265266
}
266267
}
267268

268-
// Send last_error to QueryPager - query failed fully
269-
self.log_query_error(&last_error);
270-
let (proof, _) = self.sender.send(Err(last_error)).await;
269+
self.log_request_error(&last_error);
270+
let (proof, _) = self.sender.send(Err(last_error.into_query_error())).await;
271271
proof
272272
}
273273

@@ -329,7 +329,7 @@ where
329329
}) => {
330330
let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
331331
self.log_attempt_success();
332-
self.log_query_success();
332+
self.log_request_success();
333333
self.execution_profile
334334
.load_balancing_policy
335335
.on_request_success(&self.statement_info, elapsed, node);
@@ -357,7 +357,7 @@ where
357357

358358
// Query succeeded, reset retry policy for future retries
359359
self.retry_session.reset();
360-
self.log_query_start();
360+
self.log_request_start();
361361

362362
Ok(ControlFlow::Continue(()))
363363
}
@@ -392,41 +392,41 @@ where
392392
}
393393
}
394394

395-
fn log_query_start(&mut self) {
395+
fn log_request_start(&mut self) {
396396
let history_listener: &dyn HistoryListener = match &self.history_listener {
397397
Some(hl) => &**hl,
398398
None => return,
399399
};
400400

401-
self.current_query_id = Some(history_listener.log_query_start());
401+
self.current_request_id = Some(history_listener.log_request_start());
402402
}
403403

404-
fn log_query_success(&mut self) {
404+
fn log_request_success(&mut self) {
405405
let history_listener: &dyn HistoryListener = match &self.history_listener {
406406
Some(hl) => &**hl,
407407
None => return,
408408
};
409409

410-
let query_id: history::QueryId = match &self.current_query_id {
410+
let request_id: history::RequestId = match &self.current_request_id {
411411
Some(id) => *id,
412412
None => return,
413413
};
414414

415-
history_listener.log_query_success(query_id);
415+
history_listener.log_request_success(request_id);
416416
}
417417

418-
fn log_query_error(&mut self, error: &QueryError) {
418+
fn log_request_error(&mut self, error: &RequestError) {
419419
let history_listener: &dyn HistoryListener = match &self.history_listener {
420420
Some(hl) => &**hl,
421421
None => return,
422422
};
423423

424-
let query_id: history::QueryId = match &self.current_query_id {
424+
let request_id: history::RequestId = match &self.current_request_id {
425425
Some(id) => *id,
426426
None => return,
427427
};
428428

429-
history_listener.log_query_error(query_id, error);
429+
history_listener.log_request_error(request_id, error);
430430
}
431431

432432
fn log_attempt_start(&mut self, node_addr: SocketAddr) {
@@ -435,13 +435,13 @@ where
435435
None => return,
436436
};
437437

438-
let query_id: history::QueryId = match &self.current_query_id {
438+
let request_id: history::RequestId = match &self.current_request_id {
439439
Some(id) => *id,
440440
None => return,
441441
};
442442

443443
self.current_attempt_id =
444-
Some(history_listener.log_attempt_start(query_id, None, node_addr));
444+
Some(history_listener.log_attempt_start(request_id, None, node_addr));
445445
}
446446

447447
fn log_attempt_success(&mut self) {
@@ -458,7 +458,7 @@ where
458458
history_listener.log_attempt_success(attempt_id);
459459
}
460460

461-
fn log_attempt_error(&mut self, error: &QueryError, retry_decision: &RetryDecision) {
461+
fn log_attempt_error(&mut self, error: &RequestAttemptError, retry_decision: &RetryDecision) {
462462
let history_listener: &dyn HistoryListener = match &self.history_listener {
463463
Some(hl) => &**hl,
464464
None => return,
@@ -754,7 +754,7 @@ impl QueryPager {
754754
metrics,
755755
paging_state: PagingState::start(),
756756
history_listener: query.config.history_listener.clone(),
757-
current_query_id: None,
757+
current_request_id: None,
758758
current_attempt_id: None,
759759
parent_span,
760760
span_creator,
@@ -872,7 +872,7 @@ impl QueryPager {
872872
metrics: config.metrics,
873873
paging_state: PagingState::start(),
874874
history_listener: config.prepared.config.history_listener.clone(),
875-
current_query_id: None,
875+
current_request_id: None,
876876
current_attempt_id: None,
877877
parent_span,
878878
span_creator,

scylla/src/client/session.rs

Lines changed: 29 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ use crate::cluster::node::CloudEndpoint;
1515
use crate::cluster::node::{InternalKnownNode, KnownNode, NodeRef};
1616
use crate::cluster::{Cluster, ClusterNeatDebug, ClusterState};
1717
use crate::errors::{
18-
BadQuery, NewSessionError, ProtocolError, QueryError, RequestAttemptError, TracingProtocolError,
18+
BadQuery, NewSessionError, ProtocolError, QueryError, RequestAttemptError, RequestError,
19+
TracingProtocolError,
1920
};
2021
use crate::frame::response::result;
2122
#[cfg(feature = "ssl")]
@@ -1849,11 +1850,11 @@ where
18491850
QueryFut: Future<Output = Result<ResT, RequestAttemptError>>,
18501851
ResT: AllowedRunRequestResTType,
18511852
{
1852-
let history_listener_and_id: Option<(&'a dyn HistoryListener, history::QueryId)> =
1853+
let history_listener_and_id: Option<(&'a dyn HistoryListener, history::RequestId)> =
18531854
statement_config
18541855
.history_listener
18551856
.as_ref()
1856-
.map(|hl| (&**hl, hl.log_query_start()));
1857+
.map(|hl| (&**hl, hl.log_request_start()));
18571858

18581859
let load_balancer = &execution_profile.load_balancing_policy;
18591860

@@ -1899,16 +1900,18 @@ where
18991900
let request_runner_generator = |is_speculative: bool| {
19001901
let history_data: Option<HistoryData> = history_listener_and_id
19011902
.as_ref()
1902-
.map(|(history_listener, query_id)| {
1903+
.map(|(history_listener, request_id)| {
19031904
let speculative_id: Option<history::SpeculativeId> =
19041905
if is_speculative {
1905-
Some(history_listener.log_new_speculative_fiber(*query_id))
1906+
Some(
1907+
history_listener.log_new_speculative_fiber(*request_id),
1908+
)
19061909
} else {
19071910
None
19081911
};
19091912
HistoryData {
19101913
listener: *history_listener,
1911-
query_id: *query_id,
1914+
request_id: *request_id,
19121915
speculative_id,
19131916
}
19141917
});
@@ -1947,9 +1950,9 @@ where
19471950
let history_data: Option<HistoryData> =
19481951
history_listener_and_id
19491952
.as_ref()
1950-
.map(|(history_listener, query_id)| HistoryData {
1953+
.map(|(history_listener, request_id)| HistoryData {
19511954
listener: *history_listener,
1952-
query_id: *query_id,
1955+
request_id: *request_id,
19531956
speculative_id: None,
19541957
});
19551958
self.run_request_speculative_fiber(
@@ -1966,7 +1969,7 @@ where
19661969
},
19671970
)
19681971
.await
1969-
.unwrap_or(Err(QueryError::EmptyPlan))
1972+
.unwrap_or(Err(RequestError::EmptyPlan))
19701973
}
19711974
}
19721975
};
@@ -1977,24 +1980,19 @@ where
19771980
let result = match effective_timeout {
19781981
Some(timeout) => tokio::time::timeout(timeout, runner)
19791982
.await
1980-
.unwrap_or_else(|e| {
1981-
Err(QueryError::RequestTimeout(format!(
1982-
"Request took longer than {}ms: {}",
1983-
timeout.as_millis(),
1984-
e
1985-
)))
1986-
}),
1987-
None => runner.await,
1983+
.map(|res| res.map_err(RequestError::from))
1984+
.unwrap_or_else(|_| Err(RequestError::RequestTimeout(timeout))),
1985+
None => runner.await.map_err(RequestError::from),
19881986
};
19891987

1990-
if let Some((history_listener, query_id)) = history_listener_and_id {
1988+
if let Some((history_listener, request_id)) = history_listener_and_id {
19911989
match &result {
1992-
Ok(_) => history_listener.log_query_success(query_id),
1993-
Err(e) => history_listener.log_query_error(query_id, e),
1990+
Ok(_) => history_listener.log_request_success(request_id),
1991+
Err(e) => history_listener.log_request_error(request_id, e),
19941992
}
19951993
}
19961994

1997-
result
1995+
result.map_err(RequestError::into_query_error)
19981996
}
19991997

20001998
/// Executes the closure `run_request_once`, provided the load balancing plan and some information
@@ -2008,12 +2006,12 @@ where
20082006
run_request_once: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
20092007
execution_profile: &ExecutionProfileInner,
20102008
mut context: ExecuteRequestContext<'a>,
2011-
) -> Option<Result<RunRequestResult<ResT>, QueryError>>
2009+
) -> Option<Result<RunRequestResult<ResT>, RequestError>>
20122010
where
20132011
QueryFut: Future<Output = Result<ResT, RequestAttemptError>>,
20142012
ResT: AllowedRunRequestResTType,
20152013
{
2016-
let mut last_error: Option<QueryError> = None;
2014+
let mut last_error: Option<RequestError> = None;
20172015
let mut current_consistency: Consistency = context
20182016
.consistency_set_on_statement
20192017
.unwrap_or(execution_profile.consistency);
@@ -2097,12 +2095,9 @@ where
20972095
retry_decision = ?retry_decision
20982096
);
20992097

2100-
last_error = Some(request_error.into_query_error());
2101-
context.log_attempt_error(
2102-
&attempt_id,
2103-
last_error.as_ref().unwrap(),
2104-
&retry_decision,
2105-
);
2098+
context.log_attempt_error(&attempt_id, &request_error, &retry_decision);
2099+
2100+
last_error = Some(request_error.into());
21062101

21072102
match retry_decision {
21082103
RetryDecision::RetrySameNode(new_cl) => {
@@ -2142,8 +2137,8 @@ where
21422137
self.await_schema_agreement_indefinitely(),
21432138
)
21442139
.await
2145-
.unwrap_or(Err(QueryError::RequestTimeout(
2146-
"schema agreement not reached in time".to_owned(),
2140+
.unwrap_or(Err(QueryError::SchemaAgreementTimeout(
2141+
self.schema_agreement_timeout,
21472142
)))
21482143
}
21492144

@@ -2192,15 +2187,15 @@ struct ExecuteRequestContext<'a> {
21922187

21932188
struct HistoryData<'a> {
21942189
listener: &'a dyn HistoryListener,
2195-
query_id: history::QueryId,
2190+
request_id: history::RequestId,
21962191
speculative_id: Option<history::SpeculativeId>,
21972192
}
21982193

21992194
impl ExecuteRequestContext<'_> {
22002195
fn log_attempt_start(&self, node_addr: SocketAddr) -> Option<history::AttemptId> {
22012196
self.history_data.as_ref().map(|hd| {
22022197
hd.listener
2203-
.log_attempt_start(hd.query_id, hd.speculative_id, node_addr)
2198+
.log_attempt_start(hd.request_id, hd.speculative_id, node_addr)
22042199
})
22052200
}
22062201

@@ -2221,7 +2216,7 @@ impl ExecuteRequestContext<'_> {
22212216
fn log_attempt_error(
22222217
&self,
22232218
attempt_id_opt: &Option<history::AttemptId>,
2224-
error: &QueryError,
2219+
error: &RequestAttemptError,
22252220
retry_decision: &RetryDecision,
22262221
) {
22272222
let attempt_id: &history::AttemptId = match attempt_id_opt {

0 commit comments

Comments
 (0)