diff --git a/packages/cubejs-backend-native/Cargo.lock b/packages/cubejs-backend-native/Cargo.lock index 45941c342a94d..0c80dd37df533 100644 --- a/packages/cubejs-backend-native/Cargo.lock +++ b/packages/cubejs-backend-native/Cargo.lock @@ -749,6 +749,7 @@ dependencies = [ "anyhow", "chrono", "cubeshared", + "flatbuffers 23.5.26", "indexmap 2.7.1", "itertools 0.13.0", "neon", diff --git a/packages/cubejs-cubestore-driver/codegen/generate.sh b/packages/cubejs-cubestore-driver/codegen/generate.sh index 868ea5bde5299..5013593040c20 100755 --- a/packages/cubejs-cubestore-driver/codegen/generate.sh +++ b/packages/cubejs-cubestore-driver/codegen/generate.sh @@ -1,4 +1,4 @@ #!/bin/bash -flatc --ts ../../../rust/cubestore/cubestore/src/codegen/http_message.fbs --ts-flat-files +flatc --ts ../../../rust/cubeshared/src/codegen/http_message.fbs --ts-flat-files mv http_message.ts index.ts diff --git a/rust/cubeorchestrator/Cargo.lock b/rust/cubeorchestrator/Cargo.lock index 736239ae6a952..ee41e9c189688 100644 --- a/rust/cubeorchestrator/Cargo.lock +++ b/rust/cubeorchestrator/Cargo.lock @@ -111,6 +111,7 @@ dependencies = [ "anyhow", "chrono", "cubeshared", + "flatbuffers", "indexmap", "itertools", "neon", diff --git a/rust/cubeorchestrator/Cargo.toml b/rust/cubeorchestrator/Cargo.toml index f1c400bb7bfaa..d1030cc40a1a6 100644 --- a/rust/cubeorchestrator/Cargo.toml +++ b/rust/cubeorchestrator/Cargo.toml @@ -11,6 +11,7 @@ serde_json = "1.0.133" anyhow = "1.0" itertools = "0.13.0" indexmap = { version = "2.0", features = ["serde"] } +flatbuffers = "23.5.26" [dependencies.neon] version = "=1" diff --git a/rust/cubeorchestrator/src/query_message_parser.rs b/rust/cubeorchestrator/src/query_message_parser.rs index 1a32847126a95..989612097772c 100644 --- a/rust/cubeorchestrator/src/query_message_parser.rs +++ b/rust/cubeorchestrator/src/query_message_parser.rs @@ -2,7 +2,8 @@ use crate::{ query_result_transform::{DBResponsePrimitive, DBResponseValue}, transport::JsRawData, }; -use cubeshared::codegen::{root_as_http_message, HttpCommand}; +use cubeshared::codegen::{root_as_http_message_with_opts, HttpCommand}; +use flatbuffers::VerifierOptions; use indexmap::IndexMap; use neon::prelude::Finalize; @@ -12,7 +13,7 @@ pub enum ParseError { EmptyResultSet, NullRow, ColumnNameNotDefined, - FlatBufferError, + FlatBufferError(String), ErrorMessage(String), } @@ -23,7 +24,7 @@ impl std::fmt::Display for ParseError { ParseError::EmptyResultSet => write!(f, "Empty resultSet"), ParseError::NullRow => write!(f, "Null row"), ParseError::ColumnNameNotDefined => write!(f, "Column name is not defined"), - ParseError::FlatBufferError => write!(f, "FlatBuffer parsing error"), + ParseError::FlatBufferError(msg) => write!(f, "FlatBuffer parsing error: {}", msg), ParseError::ErrorMessage(msg) => write!(f, "Error: {}", msg), } } @@ -48,14 +49,18 @@ impl QueryResult { columns_pos: IndexMap::new(), }; - let http_message = - root_as_http_message(msg_data).map_err(|_| ParseError::FlatBufferError)?; + let mut opts = VerifierOptions::default(); + opts.max_tables = 10_000_000; // Support up to 10M tables + opts.max_apparent_size = 1 << 31; // 2GB limit for large datasets + + let http_message = root_as_http_message_with_opts(&opts, msg_data) + .map_err(|err| ParseError::FlatBufferError(err.to_string()))?; match http_message.command_type() { HttpCommand::HttpError => { - let http_error = http_message - .command_as_http_error() - .ok_or(ParseError::FlatBufferError)?; + let http_error = http_message.command_as_http_error().ok_or_else(|| { + ParseError::FlatBufferError("Failed to parse HttpError command".to_string()) + })?; let error_message = http_error.error().unwrap_or("Unknown error").to_string(); Err(ParseError::ErrorMessage(error_message)) } @@ -145,3 +150,204 @@ impl QueryResult { }) } } + +#[cfg(test)] +mod tests { + use super::*; + use cubeshared::codegen::{ + root_as_http_message_unchecked, HttpColumnValue, HttpColumnValueArgs, HttpCommand, + HttpMessage, HttpMessageArgs, HttpResultSet, HttpResultSetArgs, HttpRow, HttpRowArgs, + }; + use flatbuffers::FlatBufferBuilder; + + /// Helper function to create a test HttpMessage with a given number of rows and columns + fn create_test_message(num_rows: usize, num_columns: usize) -> Vec { + let mut builder = FlatBufferBuilder::new(); + + // Create column names + let column_names: Vec<_> = (0..num_columns) + .map(|i| builder.create_string(&format!("column_{}", i))) + .collect(); + + // Create rows with values + let mut rows_vec = Vec::with_capacity(num_rows); + for row_idx in 0..num_rows { + // Create column values for this row + let mut values_vec = Vec::with_capacity(num_columns); + for col_idx in 0..num_columns { + let value_str = builder.create_string(&format!("row_{}_col_{}", row_idx, col_idx)); + let col_value = HttpColumnValue::create( + &mut builder, + &HttpColumnValueArgs { + string_value: Some(value_str), + }, + ); + values_vec.push(col_value); + } + + let values_vector = builder.create_vector(&values_vec); + let row = HttpRow::create( + &mut builder, + &HttpRowArgs { + values: Some(values_vector), + }, + ); + rows_vec.push(row); + } + + // Create the result set + let columns_vector = builder.create_vector(&column_names); + let rows_vector = builder.create_vector(&rows_vec); + + let result_set = HttpResultSet::create( + &mut builder, + &HttpResultSetArgs { + columns: Some(columns_vector), + rows: Some(rows_vector), + }, + ); + + // Create the message + let connection_id = builder.create_string("test_connection"); + let message = HttpMessage::create( + &mut builder, + &HttpMessageArgs { + message_id: 1, + command_type: HttpCommand::HttpResultSet, + command: Some(result_set.as_union_value()), + connection_id: Some(connection_id), + }, + ); + + builder.finish(message, None); + builder.finished_data().to_vec() + } + + #[test] + fn test_parse_small_result_set() { + // Small result set should work fine + let msg_data = create_test_message(10, 5); + let result = QueryResult::from_cubestore_fb(&msg_data); + assert!(result.is_ok()); + + let query_result = result.unwrap(); + assert_eq!(query_result.columns.len(), 5); + assert_eq!(query_result.rows.len(), 10); + } + + #[test] + fn test_parse_medium_result_set() { + // Medium result set: 1000 rows, 20 columns + let msg_data = create_test_message(1000, 20); + let result = QueryResult::from_cubestore_fb(&msg_data); + assert!(result.is_ok()); + + let query_result = result.unwrap(); + assert_eq!(query_result.columns.len(), 20); + assert_eq!(query_result.rows.len(), 1000); + } + + #[test] + fn test_parse_large_result_set() { + // Large result set: 10,000 rows, 30 columns + // This should start showing verification issues + let msg_data = create_test_message(10_000, 30); + let result = QueryResult::from_cubestore_fb(&msg_data); + assert!(result.is_ok()); + + let query_result = result.unwrap(); + assert_eq!(query_result.columns.len(), 30); + assert_eq!(query_result.rows.len(), 10_000); + } + + #[test] + fn test_parse_very_large_result_set() { + // Very large result set: 33,000 rows, 40 columns + let msg_data = create_test_message(33_000, 40); + let result = QueryResult::from_cubestore_fb(&msg_data); + assert!(result.is_ok()); + + let query_result = result.unwrap(); + assert_eq!(query_result.columns.len(), 40); + assert_eq!(query_result.rows.len(), 33_000); + } + + #[test] + fn test_parse_huge_result_set() { + // Huge result set: 50,000 rows, 100 columns + let msg_data = create_test_message(50_000, 100); + let result = QueryResult::from_cubestore_fb(&msg_data); + assert!(result.is_ok()); + + let query_result = result.unwrap(); + assert_eq!(query_result.columns.len(), 100); + assert_eq!(query_result.rows.len(), 50_000); + } + + #[test] + fn test_compare_with_unchecked_parse() { + // Test to demonstrate that unchecked parsing would work + let msg_data = create_test_message(33_000, 40); + + // Checked version (current implementation) + let checked_result = QueryResult::from_cubestore_fb(&msg_data); + + // Try unchecked version to verify the data itself is valid + let unchecked_result = unsafe { + let http_message = root_as_http_message_unchecked(&msg_data); + match http_message.command_type() { + HttpCommand::HttpResultSet => { + let result_set = http_message.command_as_http_result_set(); + if let Some(rs) = result_set { + if let Some(rows) = rs.rows() { + println!("Unchecked parse found {} rows", rows.len()); + Ok(rows.len()) + } else { + Err("No rows") + } + } else { + Err("No result set") + } + } + _ => Err("Wrong command type"), + } + }; + + assert!(checked_result.is_ok()); + assert!(unchecked_result.is_ok()); + } + + #[test] + fn test_parse_with_custom_verifier_options() { + use cubeshared::codegen::root_as_http_message_with_opts; + use flatbuffers::VerifierOptions; + + // Test that custom verifier options can handle large datasets + let msg_data = create_test_message(33_000, 40); + + // Create custom verifier options with increased limits + let mut opts = VerifierOptions::default(); + opts.max_tables = 10_000_000; // Support up to 10M tables + opts.max_apparent_size = 1 << 31; // 2GB limit + + // This should succeed with custom options + let result = root_as_http_message_with_opts(&opts, &msg_data); + + match result { + Ok(http_message) => match http_message.command_type() { + HttpCommand::HttpResultSet => { + let result_set = http_message.command_as_http_result_set(); + if let Some(rs) = result_set { + if let Some(rows) = rs.rows() { + assert_eq!(rows.len(), 33_000); + } + } + } + _ => panic!("Wrong command type"), + }, + Err(e) => { + panic!("Failed to parse with custom verifier options: {:?}", e); + } + } + } +}