diff --git a/packages/cubejs-backend-native/src/stream.rs b/packages/cubejs-backend-native/src/stream.rs index 9c7c45f2dcbbb..9ba7168c27166 100644 --- a/packages/cubejs-backend-native/src/stream.rs +++ b/packages/cubejs-backend-native/src/stream.rs @@ -312,10 +312,10 @@ pub async fn call_js_with_stream_as_callback( schema: SchemaRef, member_fields: Vec, ) -> Result, CubeError> { - let channel_size = std::env::var("CUBEJS_DB_QUERY_STREAM_HIGH_WATER_MARK") - .ok() - .map(|v| v.parse::().unwrap()) - .unwrap_or(8192); + // Each chunk is a RecordBatch of up to CUBEJS_DB_QUERY_STREAM_HIGH_WATER_MARK rows. + // Let's keep the size small to avoid memory issues and allow linear scaling + // of the buffer size depending on the env value. + let channel_size = 10; let (sender, receiver) = mpsc_channel::(channel_size); let (ready_sender, ready_receiver) = oneshot::channel(); diff --git a/rust/cubesql/cubesql/src/sql/postgres/extended.rs b/rust/cubesql/cubesql/src/sql/postgres/extended.rs index 13d18c80fc1d5..dcec115cc87b1 100644 --- a/rust/cubesql/cubesql/src/sql/postgres/extended.rs +++ b/rust/cubesql/cubesql/src/sql/postgres/extended.rs @@ -201,6 +201,7 @@ pub enum PortalFrom { Extended, } +#[derive(Debug)] pub enum PortalBatch { Description(protocol::RowDescription), Rows(BatchWriter), diff --git a/rust/cubesql/cubesql/src/sql/postgres/shim.rs b/rust/cubesql/cubesql/src/sql/postgres/shim.rs index adf1c53a1f11b..e534087f70578 100644 --- a/rust/cubesql/cubesql/src/sql/postgres/shim.rs +++ b/rust/cubesql/cubesql/src/sql/postgres/shim.rs @@ -1500,20 +1500,6 @@ impl AsyncPostgresShim { sensitive, hold, } => { - // TODO: move envs to config - let stream_mode = self.session.server.config_obj.stream_mode(); - if stream_mode { - return Err(ConnectionError::Protocol( - protocol::ErrorResponse::error( - protocol::ErrorCode::FeatureNotSupported, - "DECLARE statement can not be used if CUBESQL_STREAM_MODE == true" - .to_string(), - ) - .into(), - span_id.clone(), - )); - } - // The default is to allow scrolling in some cases; this is not the same as specifying SCROLL. if scroll.is_some() { return Err(ConnectionError::Protocol(