Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions packages/cubejs-backend-native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions rust/cubesql/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/cubesql/cubesql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ homepage = "https://cube.dev"

[dependencies]
arc-swap = "1"
datafusion = { git = 'https://github.com/cube-js/arrow-datafusion.git', rev = "8e60e17fb5872d50e00ee65bf2200e1ebe5122ea", default-features = false, features = [
datafusion = { git = 'https://github.com/cube-js/arrow-datafusion.git', rev = "48ff05841a0758bba9d69c4918f724d9515d84a6", default-features = false, features = [
"regex_expressions",
"unicode_expressions",
] }
Expand Down
44 changes: 35 additions & 9 deletions rust/cubesql/cubesql/src/compile/engine/df/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ impl ExecutionPlan for CubeScanExecutionPlan {
self.member_fields.clone(),
)
.await;
let stream = result.map_err(|err| DataFusionError::Execution(err.to_string()))?;
let stream = result.map_err(|err| DataFusionError::External(Box::new(err)))?;
let main_stream = CubeScanMemoryStream::new(stream);

return Ok(Box::pin(CubeScanStreamRouter::new(
Expand Down Expand Up @@ -600,7 +600,16 @@ impl CubeScanMemoryStream {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<ArrowResult<RecordBatch>>> {
self.receiver.poll_recv(cx).map(|res| match res {
Some(Some(Ok(chunk))) => Some(Ok(chunk)),
Some(Some(Err(err))) => Some(Err(ArrowError::ComputeError(err.to_string()))),
Some(Some(Err(mut err))) => {
// Remove `Error: ` prefix that can come from database
err.message = if let Some(message) = err.message.strip_prefix("Error: ") {
message.to_string()
} else {
err.message
};
err.message = format!("Database Execution Error: {}", err.message);
Some(Err(ArrowError::ExternalError(Box::new(err))))
}
Some(None) => None,
None => None,
})
Expand Down Expand Up @@ -637,9 +646,9 @@ impl Stream for CubeScanStreamRouter {
match &mut self.main_stream {
Some(main_stream) => {
let next = main_stream.poll_next(cx);
if let Poll::Ready(Some(Err(ArrowError::ComputeError(err)))) = &next {
if let Poll::Ready(Some(Err(ArrowError::ExternalError(err)))) = &next {
if err
.as_str()
.to_string()
.contains("streamQuery() method is not implemented yet")
{
warn!("{}", err);
Expand Down Expand Up @@ -697,7 +706,7 @@ async fn load_data(

let mut response = JsonValueObject::new(data);
let rec = transform_response(&mut response, schema.clone(), &member_fields)
.map_err(|e| DataFusionError::Execution(e.message.to_string()))?;
.map_err(|e| ArrowError::ExternalError(Box::new(e)))?;

rec
} else {
Expand All @@ -713,21 +722,38 @@ async fn load_data(
options.cache_mode,
)
.await
.map_err(|err| ArrowError::ComputeError(err.to_string()))?;
.map_err(|mut err| {
// Remove `Error: ` prefix that can come from database
err.message = if let Some(message) = err.message.strip_prefix("Error: ") {
message.to_string()
} else {
err.message
};
err.message = format!("Database Execution Error: {}", err.message);
ArrowError::ExternalError(Box::new(err))
})?;
let response = result.first();
if let Some(data) = response.cloned() {
match (options.max_records, data.num_rows()) {
(Some(max_records), len) if len >= max_records => {
return Err(ArrowError::ComputeError(format!("One of the Cube queries exceeded the maximum row limit ({}). JOIN/UNION is not possible as it will produce incorrect results. Try filtering the results more precisely or moving post-processing functions to an outer query.", max_records)));
return Err(ArrowError::ExternalError(Box::new(CubeError::user(
format!(
"One of the Cube queries exceeded the maximum row limit ({}). \
JOIN/UNION is not possible as it will produce incorrect results. \
Try filtering the results more precisely \
or moving post-processing functions to an outer query.",
max_records
),
))));
}
(_, _) => (),
}

data
} else {
return Err(ArrowError::ComputeError(
return Err(ArrowError::ExternalError(Box::new(CubeError::internal(
"Unable to extract results from response: results is empty".to_string(),
));
))));
}
};

Expand Down
85 changes: 76 additions & 9 deletions rust/cubesql/cubesql/src/sql/postgres/shim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
transport::{MetaContext, SpanId},
CubeError,
};
use datafusion::{arrow::error::ArrowError, error::DataFusionError};
use futures::{FutureExt, StreamExt};
use log::{debug, error, trace};
use pg_srv::{
Expand Down Expand Up @@ -108,6 +109,10 @@ impl QueryPlanExt for QueryPlan {
pub enum ConnectionError {
#[error("CubeError: {0}")]
Cube(CubeError, Option<Arc<SpanId>>),
#[error("DataFusionError: {0}")]
DataFusion(DataFusionError, Option<Arc<SpanId>>),
Copy link
Member

@ovr ovr Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, we need to box DataFusionError, ArrowError. I assume it's a large enum. Can we add 1-liner tests with assert_eq for mem::size_of<ConnectionError>. Thanks

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tested this and CubeError is the variant of biggest size. DataFusionError and ArrowError actually happen to be the smallest, so I avoided boxing them in the end. I added the mem::size_of test to verify this.

#[error("ArrowError: {0}")]
Arrow(ArrowError, Option<Arc<SpanId>>),
#[error("CompilationError: {0}")]
CompilationError(CompilationError, Option<Arc<SpanId>>),
#[error("ProtocolError: {0}")]
Expand All @@ -121,15 +126,16 @@ impl ConnectionError {
ConnectionError::Cube(e, _) => e.backtrace(),
ConnectionError::CompilationError(e, _) => e.backtrace(),
ConnectionError::Protocol(e, _) => e.backtrace(),
ConnectionError::DataFusion(_, _) | ConnectionError::Arrow(_, _) => None,
}
}

/// Converts Error to protocol::ErrorResponse which is usefully for writing response to the client
pub fn to_error_response(self) -> protocol::ErrorResponse {
match self {
ConnectionError::Cube(e, _) => {
protocol::ErrorResponse::error(protocol::ErrorCode::InternalError, e.to_string())
}
ConnectionError::Cube(e, _) => Self::cube_to_error_response(&e),
ConnectionError::DataFusion(e, _) => Self::df_to_error_response(&e),
ConnectionError::Arrow(e, _) => Self::arrow_to_error_response(&e),
ConnectionError::CompilationError(e, _) => {
fn to_error_response(e: CompilationError) -> protocol::ErrorResponse {
match e {
Expand Down Expand Up @@ -161,6 +167,8 @@ impl ConnectionError {
pub fn with_span_id(self, span_id: Option<Arc<SpanId>>) -> Self {
match self {
ConnectionError::Cube(e, _) => ConnectionError::Cube(e, span_id),
ConnectionError::DataFusion(e, _) => ConnectionError::DataFusion(e, span_id),
ConnectionError::Arrow(e, _) => ConnectionError::Arrow(e, span_id),
ConnectionError::CompilationError(e, _) => {
ConnectionError::CompilationError(e, span_id)
}
Expand All @@ -171,10 +179,59 @@ impl ConnectionError {
pub fn span_id(&self) -> Option<Arc<SpanId>> {
match self {
ConnectionError::Cube(_, span_id) => span_id.clone(),
ConnectionError::DataFusion(_, span_id) => span_id.clone(),
ConnectionError::Arrow(_, span_id) => span_id.clone(),
ConnectionError::CompilationError(_, span_id) => span_id.clone(),
ConnectionError::Protocol(_, span_id) => span_id.clone(),
}
}

fn cube_to_error_response(e: &CubeError) -> protocol::ErrorResponse {
let message = e.to_string();
// Remove `Error: ` prefix that can come from JS
let message = if let Some(message) = message.strip_prefix("Error: ") {
message.to_string()
} else {
message
};
protocol::ErrorResponse::error(protocol::ErrorCode::InternalError, message)
}

fn df_to_error_response(e: &DataFusionError) -> protocol::ErrorResponse {
match e {
DataFusionError::ArrowError(arrow_err) => {
return Self::arrow_to_error_response(arrow_err);
}
DataFusionError::External(err) => {
if let Some(cube_err) = err.downcast_ref::<CubeError>() {
return Self::cube_to_error_response(cube_err);
}
}
_ => {}
}
protocol::ErrorResponse::error(
protocol::ErrorCode::InternalError,
format!("Post-processing Error: {}", e),
)
}

fn arrow_to_error_response(e: &ArrowError) -> protocol::ErrorResponse {
match e {
ArrowError::ExternalError(err) => {
if let Some(df_err) = err.downcast_ref::<DataFusionError>() {
return Self::df_to_error_response(df_err);
}
if let Some(cube_err) = err.downcast_ref::<CubeError>() {
return Self::cube_to_error_response(cube_err);
}
}
_ => {}
}
protocol::ErrorResponse::error(
protocol::ErrorCode::InternalError,
format!("Post-processing Error: {}", e),
)
}
}

impl From<CubeError> for ConnectionError {
Expand All @@ -201,15 +258,15 @@ impl From<tokio::task::JoinError> for ConnectionError {
}
}

impl From<datafusion::error::DataFusionError> for ConnectionError {
fn from(e: datafusion::error::DataFusionError) -> Self {
ConnectionError::Cube(e.into(), None)
impl From<DataFusionError> for ConnectionError {
fn from(e: DataFusionError) -> Self {
ConnectionError::DataFusion(e, None)
}
}

impl From<datafusion::arrow::error::ArrowError> for ConnectionError {
fn from(e: datafusion::arrow::error::ArrowError) -> Self {
ConnectionError::Cube(e.into(), None)
impl From<ArrowError> for ConnectionError {
fn from(e: ArrowError) -> Self {
ConnectionError::Arrow(e, None)
}
}

Expand Down Expand Up @@ -1904,3 +1961,13 @@ impl AsyncPostgresShim {
.ok_or(CubeError::internal("must be auth".to_string()))
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_connection_error_mem_size() {
assert_eq!(std::mem::size_of::<ConnectionError>(), 136)
}
}
Loading