Skip to content

Commit 5c9d92d

Browse files
committed
chore(cubesql): Improve error handling
Signed-off-by: Alex Qyoun-ae <[email protected]>
1 parent fac6d38 commit 5c9d92d

File tree

5 files changed

+124
-31
lines changed

5 files changed

+124
-31
lines changed

packages/cubejs-backend-native/Cargo.lock

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubesql/Cargo.lock

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubesql/cubesql/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ homepage = "https://cube.dev"
1010

1111
[dependencies]
1212
arc-swap = "1"
13-
datafusion = { git = 'https://github.com/cube-js/arrow-datafusion.git', rev = "8e60e17fb5872d50e00ee65bf2200e1ebe5122ea", default-features = false, features = [
13+
datafusion = { git = 'https://github.com/cube-js/arrow-datafusion.git', rev = "48ff05841a0758bba9d69c4918f724d9515d84a6", default-features = false, features = [
1414
"regex_expressions",
1515
"unicode_expressions",
1616
] }

rust/cubesql/cubesql/src/compile/engine/df/scan.rs

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,7 @@ impl ExecutionPlan for CubeScanExecutionPlan {
483483
self.member_fields.clone(),
484484
)
485485
.await;
486-
let stream = result.map_err(|err| DataFusionError::Execution(err.to_string()))?;
486+
let stream = result.map_err(|err| DataFusionError::External(Box::new(err)))?;
487487
let main_stream = CubeScanMemoryStream::new(stream);
488488

489489
return Ok(Box::pin(CubeScanStreamRouter::new(
@@ -600,7 +600,16 @@ impl CubeScanMemoryStream {
600600
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<ArrowResult<RecordBatch>>> {
601601
self.receiver.poll_recv(cx).map(|res| match res {
602602
Some(Some(Ok(chunk))) => Some(Ok(chunk)),
603-
Some(Some(Err(err))) => Some(Err(ArrowError::ComputeError(err.to_string()))),
603+
Some(Some(Err(mut err))) => {
604+
// Remove `Error: ` prefix that can come from database
605+
err.message = if let Some(message) = err.message.strip_prefix("Error: ") {
606+
message.to_string()
607+
} else {
608+
err.message
609+
};
610+
err.message = format!("Database Execution Error: {}", err.message);
611+
Some(Err(ArrowError::ExternalError(Box::new(err))))
612+
}
604613
Some(None) => None,
605614
None => None,
606615
})
@@ -637,9 +646,9 @@ impl Stream for CubeScanStreamRouter {
637646
match &mut self.main_stream {
638647
Some(main_stream) => {
639648
let next = main_stream.poll_next(cx);
640-
if let Poll::Ready(Some(Err(ArrowError::ComputeError(err)))) = &next {
649+
if let Poll::Ready(Some(Err(ArrowError::ExternalError(err)))) = &next {
641650
if err
642-
.as_str()
651+
.to_string()
643652
.contains("streamQuery() method is not implemented yet")
644653
{
645654
warn!("{}", err);
@@ -697,7 +706,7 @@ async fn load_data(
697706

698707
let mut response = JsonValueObject::new(data);
699708
let rec = transform_response(&mut response, schema.clone(), &member_fields)
700-
.map_err(|e| DataFusionError::Execution(e.message.to_string()))?;
709+
.map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
701710

702711
rec
703712
} else {
@@ -713,21 +722,38 @@ async fn load_data(
713722
options.cache_mode,
714723
)
715724
.await
716-
.map_err(|err| ArrowError::ComputeError(err.to_string()))?;
725+
.map_err(|mut err| {
726+
// Remove `Error: ` prefix that can come from database
727+
err.message = if let Some(message) = err.message.strip_prefix("Error: ") {
728+
message.to_string()
729+
} else {
730+
err.message
731+
};
732+
err.message = format!("Database Execution Error: {}", err.message);
733+
ArrowError::ExternalError(Box::new(err))
734+
})?;
717735
let response = result.first();
718736
if let Some(data) = response.cloned() {
719737
match (options.max_records, data.num_rows()) {
720738
(Some(max_records), len) if len >= max_records => {
721-
return Err(ArrowError::ComputeError(format!("One of the Cube queries exceeded the maximum row limit ({}). JOIN/UNION is not possible as it will produce incorrect results. Try filtering the results more precisely or moving post-processing functions to an outer query.", max_records)));
739+
return Err(ArrowError::ExternalError(Box::new(CubeError::user(
740+
format!(
741+
"One of the Cube queries exceeded the maximum row limit ({}). \
742+
JOIN/UNION is not possible as it will produce incorrect results. \
743+
Try filtering the results more precisely \
744+
or moving post-processing functions to an outer query.",
745+
max_records
746+
),
747+
))));
722748
}
723749
(_, _) => (),
724750
}
725751

726752
data
727753
} else {
728-
return Err(ArrowError::ComputeError(
754+
return Err(ArrowError::ExternalError(Box::new(CubeError::internal(
729755
"Unable to extract results from response: results is empty".to_string(),
730-
));
756+
))));
731757
}
732758
};
733759

rust/cubesql/cubesql/src/sql/postgres/shim.rs

Lines changed: 76 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::{
2222
transport::{MetaContext, SpanId},
2323
CubeError,
2424
};
25+
use datafusion::{arrow::error::ArrowError, error::DataFusionError};
2526
use futures::{FutureExt, StreamExt};
2627
use log::{debug, error, trace};
2728
use pg_srv::{
@@ -108,6 +109,10 @@ impl QueryPlanExt for QueryPlan {
108109
pub enum ConnectionError {
109110
#[error("CubeError: {0}")]
110111
Cube(CubeError, Option<Arc<SpanId>>),
112+
#[error("DataFusionError: {0}")]
113+
DataFusion(DataFusionError, Option<Arc<SpanId>>),
114+
#[error("ArrowError: {0}")]
115+
Arrow(ArrowError, Option<Arc<SpanId>>),
111116
#[error("CompilationError: {0}")]
112117
CompilationError(CompilationError, Option<Arc<SpanId>>),
113118
#[error("ProtocolError: {0}")]
@@ -121,15 +126,16 @@ impl ConnectionError {
121126
ConnectionError::Cube(e, _) => e.backtrace(),
122127
ConnectionError::CompilationError(e, _) => e.backtrace(),
123128
ConnectionError::Protocol(e, _) => e.backtrace(),
129+
ConnectionError::DataFusion(_, _) | ConnectionError::Arrow(_, _) => None,
124130
}
125131
}
126132

127133
/// Converts Error to protocol::ErrorResponse which is usefully for writing response to the client
128134
pub fn to_error_response(self) -> protocol::ErrorResponse {
129135
match self {
130-
ConnectionError::Cube(e, _) => {
131-
protocol::ErrorResponse::error(protocol::ErrorCode::InternalError, e.to_string())
132-
}
136+
ConnectionError::Cube(e, _) => Self::cube_to_error_response(&e),
137+
ConnectionError::DataFusion(e, _) => Self::df_to_error_response(&e),
138+
ConnectionError::Arrow(e, _) => Self::arrow_to_error_response(&e),
133139
ConnectionError::CompilationError(e, _) => {
134140
fn to_error_response(e: CompilationError) -> protocol::ErrorResponse {
135141
match e {
@@ -161,6 +167,8 @@ impl ConnectionError {
161167
pub fn with_span_id(self, span_id: Option<Arc<SpanId>>) -> Self {
162168
match self {
163169
ConnectionError::Cube(e, _) => ConnectionError::Cube(e, span_id),
170+
ConnectionError::DataFusion(e, _) => ConnectionError::DataFusion(e, span_id),
171+
ConnectionError::Arrow(e, _) => ConnectionError::Arrow(e, span_id),
164172
ConnectionError::CompilationError(e, _) => {
165173
ConnectionError::CompilationError(e, span_id)
166174
}
@@ -171,10 +179,59 @@ impl ConnectionError {
171179
pub fn span_id(&self) -> Option<Arc<SpanId>> {
172180
match self {
173181
ConnectionError::Cube(_, span_id) => span_id.clone(),
182+
ConnectionError::DataFusion(_, span_id) => span_id.clone(),
183+
ConnectionError::Arrow(_, span_id) => span_id.clone(),
174184
ConnectionError::CompilationError(_, span_id) => span_id.clone(),
175185
ConnectionError::Protocol(_, span_id) => span_id.clone(),
176186
}
177187
}
188+
189+
fn cube_to_error_response(e: &CubeError) -> protocol::ErrorResponse {
190+
let message = e.to_string();
191+
// Remove `Error: ` prefix that can come from JS
192+
let message = if let Some(message) = message.strip_prefix("Error: ") {
193+
message.to_string()
194+
} else {
195+
message
196+
};
197+
protocol::ErrorResponse::error(protocol::ErrorCode::InternalError, message)
198+
}
199+
200+
fn df_to_error_response(e: &DataFusionError) -> protocol::ErrorResponse {
201+
match e {
202+
DataFusionError::ArrowError(arrow_err) => {
203+
return Self::arrow_to_error_response(arrow_err);
204+
}
205+
DataFusionError::External(err) => {
206+
if let Some(cube_err) = err.downcast_ref::<CubeError>() {
207+
return Self::cube_to_error_response(cube_err);
208+
}
209+
}
210+
_ => {}
211+
}
212+
protocol::ErrorResponse::error(
213+
protocol::ErrorCode::InternalError,
214+
format!("Post-processing Error: {}", e),
215+
)
216+
}
217+
218+
fn arrow_to_error_response(e: &ArrowError) -> protocol::ErrorResponse {
219+
match e {
220+
ArrowError::ExternalError(err) => {
221+
if let Some(df_err) = err.downcast_ref::<DataFusionError>() {
222+
return Self::df_to_error_response(df_err);
223+
}
224+
if let Some(cube_err) = err.downcast_ref::<CubeError>() {
225+
return Self::cube_to_error_response(cube_err);
226+
}
227+
}
228+
_ => {}
229+
}
230+
protocol::ErrorResponse::error(
231+
protocol::ErrorCode::InternalError,
232+
format!("Post-processing Error: {}", e),
233+
)
234+
}
178235
}
179236

180237
impl From<CubeError> for ConnectionError {
@@ -201,15 +258,15 @@ impl From<tokio::task::JoinError> for ConnectionError {
201258
}
202259
}
203260

204-
impl From<datafusion::error::DataFusionError> for ConnectionError {
205-
fn from(e: datafusion::error::DataFusionError) -> Self {
206-
ConnectionError::Cube(e.into(), None)
261+
impl From<DataFusionError> for ConnectionError {
262+
fn from(e: DataFusionError) -> Self {
263+
ConnectionError::DataFusion(e, None)
207264
}
208265
}
209266

210-
impl From<datafusion::arrow::error::ArrowError> for ConnectionError {
211-
fn from(e: datafusion::arrow::error::ArrowError) -> Self {
212-
ConnectionError::Cube(e.into(), None)
267+
impl From<ArrowError> for ConnectionError {
268+
fn from(e: ArrowError) -> Self {
269+
ConnectionError::Arrow(e, None)
213270
}
214271
}
215272

@@ -1904,3 +1961,13 @@ impl AsyncPostgresShim {
19041961
.ok_or(CubeError::internal("must be auth".to_string()))
19051962
}
19061963
}
1964+
1965+
#[cfg(test)]
1966+
mod tests {
1967+
use super::*;
1968+
1969+
#[test]
1970+
fn test_connection_error_mem_size() {
1971+
assert_eq!(std::mem::size_of::<ConnectionError>(), 136)
1972+
}
1973+
}

0 commit comments

Comments
 (0)