Skip to content

Commit 0c1c9e3

Browse files
committed
history: narrow the error types accepted by HistoryListener trait
- `log_attempt_error` will now accept an error representing a single request failure - namely `RequestAttemptError` - `log_query_error` will now accept RequestError, which represents a definite request failure. This is a superset of RequestAttemptError, as it also contains information about potential timeout, empty plan etc.
1 parent b249353 commit 0c1c9e3

File tree

4 files changed

+48
-48
lines changed

4 files changed

+48
-48
lines changed

scylla/src/client/pager.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -235,10 +235,7 @@ where
235235
retry_decision = ?retry_decision
236236
);
237237

238-
// TODO: This is a temporary measure. Will be able to remove it later in this PR
239-
// once I narrow the error type in history module.
240-
let q_error: QueryError = request_error.clone().into_query_error();
241-
self.log_attempt_error(&q_error, &retry_decision);
238+
self.log_attempt_error(&request_error, &retry_decision);
242239

243240
last_error = request_error.into();
244241

@@ -269,10 +266,8 @@ where
269266
}
270267
}
271268

272-
// Send last_error to QueryPager - query failed fully
273-
let q_error = last_error.into_query_error();
274-
self.log_query_error(&q_error);
275-
let (proof, _) = self.sender.send(Err(q_error)).await;
269+
self.log_query_error(&last_error);
270+
let (proof, _) = self.sender.send(Err(last_error.into_query_error())).await;
276271
proof
277272
}
278273

@@ -420,7 +415,7 @@ where
420415
history_listener.log_query_success(query_id);
421416
}
422417

423-
fn log_query_error(&mut self, error: &QueryError) {
418+
fn log_query_error(&mut self, error: &RequestError) {
424419
let history_listener: &dyn HistoryListener = match &self.history_listener {
425420
Some(hl) => &**hl,
426421
None => return,
@@ -463,7 +458,7 @@ where
463458
history_listener.log_attempt_success(attempt_id);
464459
}
465460

466-
fn log_attempt_error(&mut self, error: &QueryError, retry_decision: &RetryDecision) {
461+
fn log_attempt_error(&mut self, error: &RequestAttemptError, retry_decision: &RetryDecision) {
467462
let history_listener: &dyn HistoryListener = match &self.history_listener {
468463
Some(hl) => &**hl,
469464
None => return,

scylla/src/client/session.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1978,9 +1978,9 @@ where
19781978
let result = match effective_timeout {
19791979
Some(timeout) => tokio::time::timeout(timeout, runner)
19801980
.await
1981-
.map(|res| res.map_err(RequestError::into_query_error))
1982-
.unwrap_or_else(|_| Err(QueryError::RequestTimeout(timeout))),
1983-
None => runner.await.map_err(RequestError::into_query_error),
1981+
.map(|res| res.map_err(RequestError::from))
1982+
.unwrap_or_else(|_| Err(RequestError::RequestTimeout(timeout))),
1983+
None => runner.await.map_err(RequestError::from),
19841984
};
19851985

19861986
if let Some((history_listener, query_id)) = history_listener_and_id {
@@ -1990,7 +1990,7 @@ where
19901990
}
19911991
}
19921992

1993-
result
1993+
result.map_err(RequestError::into_query_error)
19941994
}
19951995

19961996
/// Executes the closure `run_request_once`, provided the load balancing plan and some information
@@ -2093,10 +2093,7 @@ where
20932093
retry_decision = ?retry_decision
20942094
);
20952095

2096-
// TODO: This is a temporary measure. Will be able to remove it later in this PR
2097-
// once I narrow the error type in history module.
2098-
let q_error: QueryError = request_error.clone().into_query_error();
2099-
context.log_attempt_error(&attempt_id, &q_error, &retry_decision);
2096+
context.log_attempt_error(&attempt_id, &request_error, &retry_decision);
21002097

21012098
last_error = Some(request_error.into());
21022099

@@ -2217,7 +2214,7 @@ impl ExecuteRequestContext<'_> {
22172214
fn log_attempt_error(
22182215
&self,
22192216
attempt_id_opt: &Option<history::AttemptId>,
2220-
error: &QueryError,
2217+
error: &RequestAttemptError,
22212218
retry_decision: &RetryDecision,
22222219
) {
22232220
let attempt_id: &history::AttemptId = match attempt_id_opt {

scylla/src/observability/history.rs

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77
time::SystemTime,
88
};
99

10-
use crate::errors::QueryError;
10+
use crate::errors::{RequestAttemptError, RequestError};
1111
use crate::policies::retry::RetryDecision;
1212
use chrono::{DateTime, Utc};
1313

@@ -43,7 +43,7 @@ pub trait HistoryListener: Debug + Send + Sync {
4343
fn log_query_success(&self, query_id: QueryId);
4444

4545
/// Log that query ended with an error - called right before returning the error from Session::query_*, execute_*, etc.
46-
fn log_query_error(&self, query_id: QueryId, error: &QueryError);
46+
fn log_query_error(&self, query_id: QueryId, error: &RequestError);
4747

4848
/// Log that a new speculative fiber has started.
4949
fn log_new_speculative_fiber(&self, query_id: QueryId) -> SpeculativeId;
@@ -63,7 +63,7 @@ pub trait HistoryListener: Debug + Send + Sync {
6363
fn log_attempt_error(
6464
&self,
6565
attempt_id: AttemptId,
66-
error: &QueryError,
66+
error: &RequestAttemptError,
6767
retry_decision: &RetryDecision,
6868
);
6969
}
@@ -89,11 +89,11 @@ pub struct HistoryCollectorData {
8989
pub enum HistoryEvent {
9090
NewQuery(QueryId),
9191
QuerySuccess(QueryId),
92-
QueryError(QueryId, QueryError),
92+
QueryError(QueryId, RequestError),
9393
NewSpeculativeFiber(SpeculativeId, QueryId),
9494
NewAttempt(AttemptId, QueryId, Option<SpeculativeId>, SocketAddr),
9595
AttemptSuccess(AttemptId),
96-
AttemptError(AttemptId, QueryError, RetryDecision),
96+
AttemptError(AttemptId, RequestAttemptError, RetryDecision),
9797
}
9898

9999
impl HistoryCollectorData {
@@ -189,7 +189,7 @@ impl HistoryListener for HistoryCollector {
189189
})
190190
}
191191

192-
fn log_query_error(&self, query_id: QueryId, error: &QueryError) {
192+
fn log_query_error(&self, query_id: QueryId, error: &RequestError) {
193193
self.do_with_data(|data| data.add_event(HistoryEvent::QueryError(query_id, error.clone())))
194194
}
195195

@@ -231,7 +231,7 @@ impl HistoryListener for HistoryCollector {
231231
fn log_attempt_error(
232232
&self,
233233
attempt_id: AttemptId,
234-
error: &QueryError,
234+
error: &RequestAttemptError,
235235
retry_decision: &RetryDecision,
236236
) {
237237
self.do_with_data(|data| {
@@ -264,7 +264,7 @@ pub struct QueryHistory {
264264
#[derive(Debug, Clone)]
265265
pub enum QueryHistoryResult {
266266
Success(TimePoint),
267-
Error(TimePoint, QueryError),
267+
Error(TimePoint, RequestError),
268268
}
269269

270270
#[derive(Debug, Clone)]
@@ -283,7 +283,7 @@ pub struct AttemptHistory {
283283
#[derive(Debug, Clone)]
284284
pub enum AttemptResult {
285285
Success(TimePoint),
286-
Error(TimePoint, QueryError, RetryDecision),
286+
Error(TimePoint, RequestAttemptError, RetryDecision),
287287
}
288288

289289
impl From<&HistoryCollectorData> for StructuredHistory {
@@ -453,7 +453,7 @@ mod tests {
453453
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
454454

455455
use crate::{
456-
errors::{DbError, QueryError},
456+
errors::{DbError, RequestAttemptError, RequestError},
457457
policies::retry::RetryDecision,
458458
test_utils::setup_tracing,
459459
};
@@ -464,7 +464,7 @@ mod tests {
464464
};
465465
use assert_matches::assert_matches;
466466
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
467-
use scylla_cql::Consistency;
467+
use scylla_cql::{frame::response::CqlResponseKind, Consistency};
468468

469469
// Set a single time for all timestamps within StructuredHistory.
470470
// HistoryCollector sets the timestamp to current time which changes with each test.
@@ -516,12 +516,12 @@ mod tests {
516516
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 3)), 19042)
517517
}
518518

519-
fn timeout_error() -> QueryError {
520-
QueryError::TimeoutError
519+
fn unexpected_response(kind: CqlResponseKind) -> RequestAttemptError {
520+
RequestAttemptError::UnexpectedResponse(kind)
521521
}
522522

523-
fn unavailable_error() -> QueryError {
524-
QueryError::DbError(
523+
fn unavailable_error() -> RequestAttemptError {
524+
RequestAttemptError::DbError(
525525
DbError::Unavailable {
526526
consistency: Consistency::Quorum,
527527
required: 2,
@@ -531,8 +531,8 @@ mod tests {
531531
)
532532
}
533533

534-
fn no_stream_id_error() -> QueryError {
535-
QueryError::UnableToAllocStreamId
534+
fn no_stream_id_error() -> RequestAttemptError {
535+
RequestAttemptError::UnableToAllocStreamId
536536
}
537537

538538
#[test]
@@ -619,7 +619,7 @@ mod tests {
619619
history_collector.log_attempt_start(query_id, None, node1_addr());
620620
history_collector.log_attempt_error(
621621
attempt_id,
622-
&QueryError::TimeoutError,
622+
&unexpected_response(CqlResponseKind::Ready),
623623
&RetryDecision::RetrySameNode(Some(Consistency::Quorum)),
624624
);
625625

@@ -631,7 +631,10 @@ mod tests {
631631
&RetryDecision::DontRetry,
632632
);
633633

634-
history_collector.log_query_error(query_id, &unavailable_error());
634+
history_collector.log_query_error(
635+
query_id,
636+
&RequestError::LastAttemptError(unavailable_error()),
637+
);
635638

636639
let history: StructuredHistory = history_collector.clone_structured_history();
637640

@@ -643,7 +646,7 @@ mod tests {
643646
| - Attempt #0 sent to 127.0.0.1:19042
644647
| request send time: 2022-02-22 20:22:22 UTC
645648
| Error at 2022-02-22 20:22:22 UTC
646-
| Error: Timeout Error
649+
| Error: Received unexpected response from the server: READY. Expected RESULT or ERROR response.
647650
| Retry decision: RetrySameNode(Some(Quorum))
648651
|
649652
| - Attempt #1 sent to 127.0.0.1:19042
@@ -717,7 +720,7 @@ mod tests {
717720

718721
history_collector.log_attempt_error(
719722
attempt1,
720-
&timeout_error(),
723+
&unexpected_response(CqlResponseKind::Event),
721724
&RetryDecision::RetryNextNode(Some(Consistency::Quorum)),
722725
);
723726
let _attempt2: AttemptId =
@@ -760,7 +763,7 @@ mod tests {
760763
| - Attempt #0 sent to 127.0.0.1:19042
761764
| request send time: 2022-02-22 20:22:22 UTC
762765
| Error at 2022-02-22 20:22:22 UTC
763-
| Error: Timeout Error
766+
| Error: Received unexpected response from the server: EVENT. Expected RESULT or ERROR response.
764767
| Retry decision: RetryNextNode(Some(Quorum))
765768
|
766769
| - Attempt #1 sent to 127.0.0.3:19042
@@ -816,7 +819,7 @@ mod tests {
816819
history_collector.log_attempt_start(query1_id, None, node1_addr());
817820
history_collector.log_attempt_error(
818821
query1_attempt1,
819-
&timeout_error(),
822+
&unexpected_response(CqlResponseKind::Supported),
820823
&RetryDecision::RetryNextNode(Some(Consistency::Quorum)),
821824
);
822825
let query1_attempt2: AttemptId =
@@ -839,7 +842,7 @@ mod tests {
839842
| - Attempt #0 sent to 127.0.0.1:19042
840843
| request send time: 2022-02-22 20:22:22 UTC
841844
| Error at 2022-02-22 20:22:22 UTC
842-
| Error: Timeout Error
845+
| Error: Received unexpected response from the server: SUPPORTED. Expected RESULT or ERROR response.
843846
| Retry decision: RetryNextNode(Some(Quorum))
844847
|
845848
| - Attempt #1 sent to 127.0.0.2:19042

scylla/tests/integration/history.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::sync::Arc;
33

44
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
55
use futures::StreamExt;
6-
use scylla::errors::QueryError;
6+
use scylla::errors::{RequestAttemptError, RequestError};
77
use scylla::frame::response::result::Row;
88
use scylla::observability::history::{
99
AttemptResult, HistoryCollector, QueryHistoryResult, StructuredHistory, TimePoint,
@@ -73,22 +73,27 @@ fn set_one_node(mut history: StructuredHistory) -> StructuredHistory {
7373
// The error message changes between Scylla/Cassandra/their versions.
7474
// Setting it to one value makes it possible to run tests consistently.
7575
fn set_one_db_error_message(mut history: StructuredHistory) -> StructuredHistory {
76-
let set_msg = |err: &mut QueryError| {
77-
if let QueryError::DbError(_, msg) = err {
76+
let set_msg_attempt = |err: &mut RequestAttemptError| {
77+
if let RequestAttemptError::DbError(_, msg) = err {
78+
*msg = "Error message from database".to_string();
79+
}
80+
};
81+
let set_msg_request_error = |err: &mut RequestError| {
82+
if let RequestError::LastAttemptError(RequestAttemptError::DbError(_, msg)) = err {
7883
*msg = "Error message from database".to_string();
7984
}
8085
};
8186

8287
for query in &mut history.queries {
8388
if let Some(QueryHistoryResult::Error(_, err)) = &mut query.result {
84-
set_msg(err);
89+
set_msg_request_error(err);
8590
}
8691
for fiber in std::iter::once(&mut query.non_speculative_fiber)
8792
.chain(query.speculative_fibers.iter_mut())
8893
{
8994
for attempt in &mut fiber.attempts {
9095
if let Some(AttemptResult::Error(_, err, _)) = &mut attempt.result {
91-
set_msg(err);
96+
set_msg_attempt(err);
9297
}
9398
}
9499
}

0 commit comments

Comments
 (0)