diff --git a/packages/cubejs-backend-native/Cargo.lock b/packages/cubejs-backend-native/Cargo.lock index 0c80dd37df533..ef76695ee3420 100644 --- a/packages/cubejs-backend-native/Cargo.lock +++ b/packages/cubejs-backend-native/Cargo.lock @@ -667,7 +667,7 @@ dependencies = [ [[package]] name = "cube-ext" version = "1.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=8e60e17fb5872d50e00ee65bf2200e1ebe5122ea#8e60e17fb5872d50e00ee65bf2200e1ebe5122ea" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=48ff05841a0758bba9d69c4918f724d9515d84a6#48ff05841a0758bba9d69c4918f724d9515d84a6" dependencies = [ "arrow", "chrono", @@ -840,7 +840,7 @@ checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" [[package]] name = "datafusion" version = "7.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=8e60e17fb5872d50e00ee65bf2200e1ebe5122ea#8e60e17fb5872d50e00ee65bf2200e1ebe5122ea" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=48ff05841a0758bba9d69c4918f724d9515d84a6#48ff05841a0758bba9d69c4918f724d9515d84a6" dependencies = [ "ahash 0.7.8", "arrow", @@ -873,7 +873,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "7.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=8e60e17fb5872d50e00ee65bf2200e1ebe5122ea#8e60e17fb5872d50e00ee65bf2200e1ebe5122ea" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=48ff05841a0758bba9d69c4918f724d9515d84a6#48ff05841a0758bba9d69c4918f724d9515d84a6" dependencies = [ "arrow", "ordered-float 2.10.1", @@ -884,7 +884,7 @@ dependencies = [ [[package]] name = "datafusion-data-access" version = "1.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=8e60e17fb5872d50e00ee65bf2200e1ebe5122ea#8e60e17fb5872d50e00ee65bf2200e1ebe5122ea" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=48ff05841a0758bba9d69c4918f724d9515d84a6#48ff05841a0758bba9d69c4918f724d9515d84a6" dependencies = [ "async-trait", "chrono", @@ -897,7 +897,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "7.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=8e60e17fb5872d50e00ee65bf2200e1ebe5122ea#8e60e17fb5872d50e00ee65bf2200e1ebe5122ea" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=48ff05841a0758bba9d69c4918f724d9515d84a6#48ff05841a0758bba9d69c4918f724d9515d84a6" dependencies = [ "ahash 0.7.8", "arrow", @@ -908,7 +908,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "7.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=8e60e17fb5872d50e00ee65bf2200e1ebe5122ea#8e60e17fb5872d50e00ee65bf2200e1ebe5122ea" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=48ff05841a0758bba9d69c4918f724d9515d84a6#48ff05841a0758bba9d69c4918f724d9515d84a6" dependencies = [ "ahash 0.7.8", "arrow", diff --git a/rust/cubesql/Cargo.lock b/rust/cubesql/Cargo.lock index 31690f3dfed5c..8a353440dcfae 100644 --- a/rust/cubesql/Cargo.lock +++ b/rust/cubesql/Cargo.lock @@ -697,7 +697,7 @@ dependencies = [ [[package]] name = "cube-ext" version = "1.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=8e60e17fb5872d50e00ee65bf2200e1ebe5122ea#8e60e17fb5872d50e00ee65bf2200e1ebe5122ea" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=48ff05841a0758bba9d69c4918f724d9515d84a6#48ff05841a0758bba9d69c4918f724d9515d84a6" dependencies = [ "arrow", "chrono", @@ -821,7 +821,7 @@ dependencies = [ [[package]] name = "datafusion" version = "7.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=8e60e17fb5872d50e00ee65bf2200e1ebe5122ea#8e60e17fb5872d50e00ee65bf2200e1ebe5122ea" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=48ff05841a0758bba9d69c4918f724d9515d84a6#48ff05841a0758bba9d69c4918f724d9515d84a6" dependencies = [ "ahash 0.7.8", "arrow", @@ -854,7 +854,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "7.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=8e60e17fb5872d50e00ee65bf2200e1ebe5122ea#8e60e17fb5872d50e00ee65bf2200e1ebe5122ea" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=48ff05841a0758bba9d69c4918f724d9515d84a6#48ff05841a0758bba9d69c4918f724d9515d84a6" dependencies = [ "arrow", "ordered-float 2.10.0", @@ -865,7 +865,7 @@ dependencies = [ [[package]] name = "datafusion-data-access" version = "1.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=8e60e17fb5872d50e00ee65bf2200e1ebe5122ea#8e60e17fb5872d50e00ee65bf2200e1ebe5122ea" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=48ff05841a0758bba9d69c4918f724d9515d84a6#48ff05841a0758bba9d69c4918f724d9515d84a6" dependencies = [ "async-trait", "chrono", @@ -878,7 +878,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "7.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=8e60e17fb5872d50e00ee65bf2200e1ebe5122ea#8e60e17fb5872d50e00ee65bf2200e1ebe5122ea" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=48ff05841a0758bba9d69c4918f724d9515d84a6#48ff05841a0758bba9d69c4918f724d9515d84a6" dependencies = [ "ahash 0.7.8", "arrow", @@ -889,7 +889,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "7.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=8e60e17fb5872d50e00ee65bf2200e1ebe5122ea#8e60e17fb5872d50e00ee65bf2200e1ebe5122ea" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=48ff05841a0758bba9d69c4918f724d9515d84a6#48ff05841a0758bba9d69c4918f724d9515d84a6" dependencies = [ "ahash 0.7.8", "arrow", diff --git a/rust/cubesql/cubesql/Cargo.toml b/rust/cubesql/cubesql/Cargo.toml index bb9c23a1aca89..83e7850db7317 100644 --- a/rust/cubesql/cubesql/Cargo.toml +++ b/rust/cubesql/cubesql/Cargo.toml @@ -10,7 +10,7 @@ homepage = "https://cube.dev" [dependencies] arc-swap = "1" -datafusion = { git = 'https://github.com/cube-js/arrow-datafusion.git', rev = "8e60e17fb5872d50e00ee65bf2200e1ebe5122ea", default-features = false, features = [ +datafusion = { git = 'https://github.com/cube-js/arrow-datafusion.git', rev = "48ff05841a0758bba9d69c4918f724d9515d84a6", default-features = false, features = [ "regex_expressions", "unicode_expressions", ] } diff --git a/rust/cubesql/cubesql/src/compile/engine/df/scan.rs b/rust/cubesql/cubesql/src/compile/engine/df/scan.rs index 250b116afbba2..627f51573e13f 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/scan.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/scan.rs @@ -483,7 +483,7 @@ impl ExecutionPlan for CubeScanExecutionPlan { self.member_fields.clone(), ) .await; - let stream = result.map_err(|err| DataFusionError::Execution(err.to_string()))?; + let stream = result.map_err(|err| DataFusionError::External(Box::new(err)))?; let main_stream = CubeScanMemoryStream::new(stream); return Ok(Box::pin(CubeScanStreamRouter::new( @@ -600,7 +600,16 @@ impl CubeScanMemoryStream { fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { self.receiver.poll_recv(cx).map(|res| match res { Some(Some(Ok(chunk))) => Some(Ok(chunk)), - Some(Some(Err(err))) => Some(Err(ArrowError::ComputeError(err.to_string()))), + Some(Some(Err(mut err))) => { + // Remove `Error: ` prefix that can come from database + err.message = if let Some(message) = err.message.strip_prefix("Error: ") { + message.to_string() + } else { + err.message + }; + err.message = format!("Database Execution Error: {}", err.message); + Some(Err(ArrowError::ExternalError(Box::new(err)))) + } Some(None) => None, None => None, }) @@ -637,9 +646,9 @@ impl Stream for CubeScanStreamRouter { match &mut self.main_stream { Some(main_stream) => { let next = main_stream.poll_next(cx); - if let Poll::Ready(Some(Err(ArrowError::ComputeError(err)))) = &next { + if let Poll::Ready(Some(Err(ArrowError::ExternalError(err)))) = &next { if err - .as_str() + .to_string() .contains("streamQuery() method is not implemented yet") { warn!("{}", err); @@ -697,7 +706,7 @@ async fn load_data( let mut response = JsonValueObject::new(data); let rec = transform_response(&mut response, schema.clone(), &member_fields) - .map_err(|e| DataFusionError::Execution(e.message.to_string()))?; + .map_err(|e| ArrowError::ExternalError(Box::new(e)))?; rec } else { @@ -713,21 +722,38 @@ async fn load_data( options.cache_mode, ) .await - .map_err(|err| ArrowError::ComputeError(err.to_string()))?; + .map_err(|mut err| { + // Remove `Error: ` prefix that can come from database + err.message = if let Some(message) = err.message.strip_prefix("Error: ") { + message.to_string() + } else { + err.message + }; + err.message = format!("Database Execution Error: {}", err.message); + ArrowError::ExternalError(Box::new(err)) + })?; let response = result.first(); if let Some(data) = response.cloned() { match (options.max_records, data.num_rows()) { (Some(max_records), len) if len >= max_records => { - return Err(ArrowError::ComputeError(format!("One of the Cube queries exceeded the maximum row limit ({}). JOIN/UNION is not possible as it will produce incorrect results. Try filtering the results more precisely or moving post-processing functions to an outer query.", max_records))); + return Err(ArrowError::ExternalError(Box::new(CubeError::user( + format!( + "One of the Cube queries exceeded the maximum row limit ({}). \ + JOIN/UNION is not possible as it will produce incorrect results. \ + Try filtering the results more precisely \ + or moving post-processing functions to an outer query.", + max_records + ), + )))); } (_, _) => (), } data } else { - return Err(ArrowError::ComputeError( + return Err(ArrowError::ExternalError(Box::new(CubeError::internal( "Unable to extract results from response: results is empty".to_string(), - )); + )))); } }; diff --git a/rust/cubesql/cubesql/src/sql/postgres/shim.rs b/rust/cubesql/cubesql/src/sql/postgres/shim.rs index d8696baac2d05..f6ae2cc36820d 100644 --- a/rust/cubesql/cubesql/src/sql/postgres/shim.rs +++ b/rust/cubesql/cubesql/src/sql/postgres/shim.rs @@ -22,6 +22,7 @@ use crate::{ transport::{MetaContext, SpanId}, CubeError, }; +use datafusion::{arrow::error::ArrowError, error::DataFusionError}; use futures::{FutureExt, StreamExt}; use log::{debug, error, trace}; use pg_srv::{ @@ -108,6 +109,10 @@ impl QueryPlanExt for QueryPlan { pub enum ConnectionError { #[error("CubeError: {0}")] Cube(CubeError, Option>), + #[error("DataFusionError: {0}")] + DataFusion(DataFusionError, Option>), + #[error("ArrowError: {0}")] + Arrow(ArrowError, Option>), #[error("CompilationError: {0}")] CompilationError(CompilationError, Option>), #[error("ProtocolError: {0}")] @@ -121,15 +126,16 @@ impl ConnectionError { ConnectionError::Cube(e, _) => e.backtrace(), ConnectionError::CompilationError(e, _) => e.backtrace(), ConnectionError::Protocol(e, _) => e.backtrace(), + ConnectionError::DataFusion(_, _) | ConnectionError::Arrow(_, _) => None, } } /// Converts Error to protocol::ErrorResponse which is usefully for writing response to the client pub fn to_error_response(self) -> protocol::ErrorResponse { match self { - ConnectionError::Cube(e, _) => { - protocol::ErrorResponse::error(protocol::ErrorCode::InternalError, e.to_string()) - } + ConnectionError::Cube(e, _) => Self::cube_to_error_response(&e), + ConnectionError::DataFusion(e, _) => Self::df_to_error_response(&e), + ConnectionError::Arrow(e, _) => Self::arrow_to_error_response(&e), ConnectionError::CompilationError(e, _) => { fn to_error_response(e: CompilationError) -> protocol::ErrorResponse { match e { @@ -161,6 +167,8 @@ impl ConnectionError { pub fn with_span_id(self, span_id: Option>) -> Self { match self { ConnectionError::Cube(e, _) => ConnectionError::Cube(e, span_id), + ConnectionError::DataFusion(e, _) => ConnectionError::DataFusion(e, span_id), + ConnectionError::Arrow(e, _) => ConnectionError::Arrow(e, span_id), ConnectionError::CompilationError(e, _) => { ConnectionError::CompilationError(e, span_id) } @@ -171,10 +179,59 @@ impl ConnectionError { pub fn span_id(&self) -> Option> { match self { ConnectionError::Cube(_, span_id) => span_id.clone(), + ConnectionError::DataFusion(_, span_id) => span_id.clone(), + ConnectionError::Arrow(_, span_id) => span_id.clone(), ConnectionError::CompilationError(_, span_id) => span_id.clone(), ConnectionError::Protocol(_, span_id) => span_id.clone(), } } + + fn cube_to_error_response(e: &CubeError) -> protocol::ErrorResponse { + let message = e.to_string(); + // Remove `Error: ` prefix that can come from JS + let message = if let Some(message) = message.strip_prefix("Error: ") { + message.to_string() + } else { + message + }; + protocol::ErrorResponse::error(protocol::ErrorCode::InternalError, message) + } + + fn df_to_error_response(e: &DataFusionError) -> protocol::ErrorResponse { + match e { + DataFusionError::ArrowError(arrow_err) => { + return Self::arrow_to_error_response(arrow_err); + } + DataFusionError::External(err) => { + if let Some(cube_err) = err.downcast_ref::() { + return Self::cube_to_error_response(cube_err); + } + } + _ => {} + } + protocol::ErrorResponse::error( + protocol::ErrorCode::InternalError, + format!("Post-processing Error: {}", e), + ) + } + + fn arrow_to_error_response(e: &ArrowError) -> protocol::ErrorResponse { + match e { + ArrowError::ExternalError(err) => { + if let Some(df_err) = err.downcast_ref::() { + return Self::df_to_error_response(df_err); + } + if let Some(cube_err) = err.downcast_ref::() { + return Self::cube_to_error_response(cube_err); + } + } + _ => {} + } + protocol::ErrorResponse::error( + protocol::ErrorCode::InternalError, + format!("Post-processing Error: {}", e), + ) + } } impl From for ConnectionError { @@ -201,15 +258,15 @@ impl From for ConnectionError { } } -impl From for ConnectionError { - fn from(e: datafusion::error::DataFusionError) -> Self { - ConnectionError::Cube(e.into(), None) +impl From for ConnectionError { + fn from(e: DataFusionError) -> Self { + ConnectionError::DataFusion(e, None) } } -impl From for ConnectionError { - fn from(e: datafusion::arrow::error::ArrowError) -> Self { - ConnectionError::Cube(e.into(), None) +impl From for ConnectionError { + fn from(e: ArrowError) -> Self { + ConnectionError::Arrow(e, None) } } @@ -1904,3 +1961,13 @@ impl AsyncPostgresShim { .ok_or(CubeError::internal("must be auth".to_string())) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_connection_error_mem_size() { + assert_eq!(std::mem::size_of::(), 136) + } +}