Skip to content

Commit 8c585f2

Browse files
authored
feat(cubesql): Streams - cancel query and drop conection handling
1 parent d5786df commit 8c585f2

File tree

3 files changed

+31
-7
lines changed

3 files changed

+31
-7
lines changed

packages/cubejs-backend-native/js/index.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,15 @@ function wrapNativeFunctionWithChannelCallback(
100100
e
101101
});
102102
}
103-
104-
channel.reject(e.message || 'Unknown JS exception');
103+
try {
104+
channel.reject(e.message || 'Unknown JS exception');
105+
} catch(e) {
106+
if (process.env.CUBEJS_NATIVE_INTERNAL_DEBUG) {
107+
console.debug("[js] channel.reject exception", {
108+
e
109+
});
110+
}
111+
}
105112

106113
// throw e;
107114
}
@@ -121,7 +128,9 @@ function wrapNativeFunctionWithStream(
121128
chunk.push(c);
122129
if (chunk.length >= 10000) {
123130
if (!writer.chunk(JSON.stringify(chunk))) {
124-
streamResponse.stream.removeAllListeners();
131+
streamResponse.stream.destroy({
132+
stack: "Rejected by client"
133+
});
125134
}
126135
chunk = [];
127136
}

rust/cubesql/cubesql/src/compile/engine/df/scan.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,7 @@ struct CubeScanMemoryStream {
407407
/// Schema representing the data
408408
schema: SchemaRef,
409409
member_fields: Vec<MemberField>,
410+
is_completed: bool,
410411
}
411412

412413
impl CubeScanMemoryStream {
@@ -419,17 +420,33 @@ impl CubeScanMemoryStream {
419420
stream,
420421
schema,
421422
member_fields,
423+
is_completed: false,
422424
}
423425
}
424426

425427
fn poll_next(&mut self) -> Option<ArrowResult<RecordBatch>> {
426428
match self.stream.poll_next() {
427-
Ok(chunk) => parse_chunk(chunk, self.schema.clone(), &self.member_fields),
429+
Ok(chunk) => {
430+
let chunk = parse_chunk(chunk, self.schema.clone(), &self.member_fields);
431+
if chunk.is_none() {
432+
self.is_completed = true;
433+
}
434+
435+
chunk
436+
}
428437
Err(err) => Some(Err(ArrowError::ComputeError(err.to_string()))),
429438
}
430439
}
431440
}
432441

442+
impl Drop for CubeScanMemoryStream {
443+
fn drop(&mut self) {
444+
if !self.is_completed {
445+
self.stream.reject();
446+
}
447+
}
448+
}
449+
433450
struct CubeScanStreamRouter {
434451
main_stream: CubeScanMemoryStream,
435452
one_shot_stream: CubeScanOneShotStream,

rust/cubesql/cubesql/src/transport/service.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,7 @@ impl CubeReadStream for CubeDummyStream {
8181
panic!("CubeDummyStream.poll_next - can not be called");
8282
}
8383

84-
fn reject(&self) {
85-
panic!("CubeDummyStream.reject - can not be called");
86-
}
84+
fn reject(&self) {}
8785
}
8886

8987
#[derive(Debug)]

0 commit comments

Comments
 (0)