Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions packages/cubejs-backend-native/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,10 @@ pub async fn call_js_with_stream_as_callback(
schema: SchemaRef,
member_fields: Vec<MemberField>,
) -> Result<Receiver<Chunk>, CubeError> {
let channel_size = std::env::var("CUBEJS_DB_QUERY_STREAM_HIGH_WATER_MARK")
.ok()
.map(|v| v.parse::<usize>().unwrap())
.unwrap_or(8192);
// Each chunk is a RecordBatch of up to CUBEJS_DB_QUERY_STREAM_HIGH_WATER_MARK rows.
// Let's keep the size small to avoid memory issues and allow linear scaling
// of the buffer size depending on the env value.
let channel_size = 10;

let (sender, receiver) = mpsc_channel::<Chunk>(channel_size);
let (ready_sender, ready_receiver) = oneshot::channel();
Expand Down
1 change: 1 addition & 0 deletions rust/cubesql/cubesql/src/sql/postgres/extended.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ pub enum PortalFrom {
Extended,
}

#[derive(Debug)]
pub enum PortalBatch {
Description(protocol::RowDescription),
Rows(BatchWriter),
Expand Down
14 changes: 0 additions & 14 deletions rust/cubesql/cubesql/src/sql/postgres/shim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1500,20 +1500,6 @@ impl AsyncPostgresShim {
sensitive,
hold,
} => {
// TODO: move envs to config
let stream_mode = self.session.server.config_obj.stream_mode();
if stream_mode {
return Err(ConnectionError::Protocol(
protocol::ErrorResponse::error(
protocol::ErrorCode::FeatureNotSupported,
"DECLARE statement can not be used if CUBESQL_STREAM_MODE == true"
.to_string(),
)
.into(),
span_id.clone(),
));
}

// The default is to allow scrolling in some cases; this is not the same as specifying SCROLL.
if scroll.is_some() {
return Err(ConnectionError::Protocol(
Expand Down
Loading