Skip to content

Commit 8ddaba5

Browse files
authored
feat(cubesql): Support cursors in stream mode (#9877)
Signed-off-by: Alex Qyoun-ae <[email protected]>
1 parent 26e477b commit 8ddaba5

File tree

3 files changed

+5
-18
lines changed

3 files changed

+5
-18
lines changed

packages/cubejs-backend-native/src/stream.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -312,10 +312,10 @@ pub async fn call_js_with_stream_as_callback(
312312
schema: SchemaRef,
313313
member_fields: Vec<MemberField>,
314314
) -> Result<Receiver<Chunk>, CubeError> {
315-
let channel_size = std::env::var("CUBEJS_DB_QUERY_STREAM_HIGH_WATER_MARK")
316-
.ok()
317-
.map(|v| v.parse::<usize>().unwrap())
318-
.unwrap_or(8192);
315+
// Each chunk is a RecordBatch of up to CUBEJS_DB_QUERY_STREAM_HIGH_WATER_MARK rows.
316+
// Let's keep the size small to avoid memory issues and allow linear scaling
317+
// of the buffer size depending on the env value.
318+
let channel_size = 10;
319319

320320
let (sender, receiver) = mpsc_channel::<Chunk>(channel_size);
321321
let (ready_sender, ready_receiver) = oneshot::channel();

rust/cubesql/cubesql/src/sql/postgres/extended.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ pub enum PortalFrom {
201201
Extended,
202202
}
203203

204+
#[derive(Debug)]
204205
pub enum PortalBatch {
205206
Description(protocol::RowDescription),
206207
Rows(BatchWriter),

rust/cubesql/cubesql/src/sql/postgres/shim.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1500,20 +1500,6 @@ impl AsyncPostgresShim {
15001500
sensitive,
15011501
hold,
15021502
} => {
1503-
// TODO: move envs to config
1504-
let stream_mode = self.session.server.config_obj.stream_mode();
1505-
if stream_mode {
1506-
return Err(ConnectionError::Protocol(
1507-
protocol::ErrorResponse::error(
1508-
protocol::ErrorCode::FeatureNotSupported,
1509-
"DECLARE statement can not be used if CUBESQL_STREAM_MODE == true"
1510-
.to_string(),
1511-
)
1512-
.into(),
1513-
span_id.clone(),
1514-
));
1515-
}
1516-
15171503
// The default is to allow scrolling in some cases; this is not the same as specifying SCROLL.
15181504
if scroll.is_some() {
15191505
return Err(ConnectionError::Protocol(

0 commit comments

Comments
 (0)