Skip to content

Commit a9b19fa

Browse files
authored
fix(cubesql): Use writable streams with plain objects instead of JSON.stringify pipe for streaming capability (#6306)
* fix(cubesql): Use writable streams with plain objects instead of JSON.stringify pipe for streaming capability * Fix clippy
1 parent d74a1f0 commit a9b19fa

File tree

7 files changed

+321
-129
lines changed

7 files changed

+321
-129
lines changed

packages/cubejs-backend-native/js/index.ts

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import fs from 'fs';
22
import path from 'path';
3+
import { Writable } from 'stream';
34
// import { getEnv } from '@cubejs-backend/shared';
45

56
export interface BaseMeta {
@@ -115,6 +116,9 @@ function wrapNativeFunctionWithChannelCallback(
115116
};
116117
};
117118

119+
const errorString = (err: any) =>
120+
err.message || err.stack?.toString() || typeof err === 'string' ? err.toString() : JSON.stringify(err);
121+
118122
// TODO: Refactor - define classes
119123
function wrapNativeFunctionWithStream(
120124
fn: (extra: any) => unknown | Promise<unknown>
@@ -129,27 +133,48 @@ function wrapNativeFunctionWithStream(
129133
streamResponse = await fn(JSON.parse(extra));
130134
if (streamResponse && streamResponse.stream) {
131135
writer.start();
132-
let chunk: object[] = [];
133-
streamResponse.stream.on('data', (c: object) => {
134-
chunk.push(c);
135-
if (chunk.length >= chunkLength) {
136-
if (!writer.chunk(JSON.stringify(chunk))) {
137-
// TODO replace with actual stream and high watermark implementation
138-
streamResponse.stream.destroy({
139-
stack: "Rejected by client"
140-
});
136+
137+
let chunkBuffer: any[] = [];
138+
const writable = new Writable({
139+
objectMode: true,
140+
highWaterMark: chunkLength,
141+
write(row: any, encoding: BufferEncoding, callback: (error?: (Error | null)) => void) {
142+
chunkBuffer.push(row);
143+
if (chunkBuffer.length < chunkLength) {
144+
callback(null);
145+
} else {
146+
const toSend = chunkBuffer;
147+
chunkBuffer = [];
148+
writer.chunk(toSend, callback);
141149
}
142-
chunk = [];
143-
}
144-
});
145-
streamResponse.stream.on('close', () => {
146-
if (chunk.length > 0) {
147-
writer.chunk(JSON.stringify(chunk));
150+
151+
},
152+
final(callback: (error?: (Error | null)) => void) {
153+
const end = (err: any) => {
154+
if (err) {
155+
callback(err);
156+
} else {
157+
writer.end(callback);
158+
}
159+
}
160+
if (chunkBuffer.length > 0) {
161+
const toSend = chunkBuffer;
162+
chunkBuffer = [];
163+
writer.chunk(toSend, end);
164+
} else {
165+
end(null);
166+
}
167+
},
168+
destroy(error: Error | null, callback: (error: (Error | null)) => void) {
169+
if (error) {
170+
writer.reject(errorString(error));
171+
}
172+
callback(null);
148173
}
149-
writer.end("");
150174
});
175+
streamResponse.stream.pipe(writable);
151176
streamResponse.stream.on('error', (err: any) => {
152-
writer.reject(err.message || err.toString());
177+
writable.destroy(err);
153178
});
154179
} else {
155180
throw new Error(`Expected stream but nothing returned`);
@@ -158,7 +183,7 @@ function wrapNativeFunctionWithStream(
158183
if (!!streamResponse && !!streamResponse.stream) {
159184
streamResponse.stream.destroy(e);
160185
}
161-
writer.reject(e.message || e.toString());
186+
writer.reject(errorString(e));
162187
}
163188
};
164189
};

packages/cubejs-backend-native/src/stream.rs

Lines changed: 124 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
use cubesql::compile::engine::df::scan::{
2+
transform_response, FieldValue, MemberField, RecordBatch, SchemaRef, ValueObject,
3+
};
4+
use std::future::Future;
15
use std::sync::{Arc, Mutex};
26

37
use cubesql::CubeError;
@@ -10,11 +14,14 @@ use crate::utils::bind_method;
1014
use tokio::sync::mpsc::{channel as mpsc_channel, Receiver, Sender};
1115
use tokio::sync::oneshot;
1216

13-
type Chunk = Result<String, CubeError>;
17+
type Chunk = Option<Result<RecordBatch, CubeError>>;
1418

1519
pub struct JsWriteStream {
1620
sender: Sender<Chunk>,
1721
ready_sender: Mutex<Option<oneshot::Sender<Result<(), CubeError>>>>,
22+
tokio_handle: tokio::runtime::Handle,
23+
schema: SchemaRef,
24+
member_fields: Vec<MemberField>,
1825
}
1926

2027
impl Finalize for JsWriteStream {}
@@ -45,10 +52,13 @@ impl JsWriteStream {
4552
Ok(obj)
4653
}
4754

48-
fn push_chunk(&self, chunk: String) -> bool {
49-
match self.sender.try_send(Ok(chunk)) {
50-
Err(_) => false,
51-
Ok(_) => true,
55+
fn push_chunk(&self, chunk: RecordBatch) -> impl Future<Output = Result<(), CubeError>> {
56+
let sender = self.sender.clone();
57+
async move {
58+
sender
59+
.send(Some(Ok(chunk)))
60+
.await
61+
.map_err(|e| CubeError::user(format!("Can't send to channel: {}", e)))
5262
}
5363
}
5464

@@ -58,29 +68,119 @@ impl JsWriteStream {
5868
}
5969
}
6070

61-
fn end(&self) {
62-
self.push_chunk("".to_string());
71+
fn end(&self) -> impl Future<Output = Result<(), CubeError>> {
72+
let sender = self.sender.clone();
73+
async move {
74+
sender
75+
.send(None)
76+
.await
77+
.map_err(|e| CubeError::user(format!("Can't send to channel: {}", e)))
78+
}
6379
}
6480

6581
fn reject(&self, err: String) {
6682
if let Some(ready_sender) = self.ready_sender.lock().unwrap().take() {
6783
let _ = ready_sender.send(Err(CubeError::internal(err.to_string())));
6884
}
69-
let _ = self.sender.try_send(Err(CubeError::internal(err)));
85+
let _ = self.sender.try_send(Some(Err(CubeError::internal(err))));
7086
}
7187
}
7288

73-
fn js_stream_push_chunk(mut cx: FunctionContext) -> JsResult<JsBoolean> {
89+
fn wait_for_future_and_execute_callback(
90+
tokio_handle: tokio::runtime::Handle,
91+
channel: Channel,
92+
callback: Root<JsFunction>,
93+
future: impl Future<Output = Result<(), CubeError>> + Send + Sync + 'static,
94+
) {
95+
tokio_handle.spawn(async move {
96+
let push_result = future.await;
97+
let send_result = channel.try_send(move |mut cx| {
98+
let undefined = cx.undefined();
99+
let result = match push_result {
100+
Ok(()) => {
101+
let args = vec![cx.null().upcast::<JsValue>(), cx.null().upcast::<JsValue>()];
102+
callback.into_inner(&mut cx).call(&mut cx, undefined, args)
103+
}
104+
Err(e) => {
105+
let args = vec![cx.string(e.message).upcast::<JsValue>()];
106+
callback.into_inner(&mut cx).call(&mut cx, undefined, args)
107+
}
108+
};
109+
if let Err(e) = result {
110+
log::error!("Error during callback execution: {}", e);
111+
}
112+
Ok(())
113+
});
114+
if let Err(e) = send_result {
115+
log::error!("Can't execute callback on node event loop: {}", e);
116+
}
117+
});
118+
}
119+
120+
pub struct JsValueObject<'a> {
121+
pub cx: FunctionContext<'a>,
122+
pub handle: Handle<'a, JsArray>,
123+
}
124+
125+
impl ValueObject for JsValueObject<'_> {
126+
fn len(&mut self) -> Result<usize, CubeError> {
127+
Ok(self.handle.len(&mut self.cx) as usize)
128+
}
129+
130+
fn get(&mut self, index: usize, field_name: &str) -> Result<FieldValue, CubeError> {
131+
let value = self
132+
.handle
133+
.get::<JsObject, _, _>(&mut self.cx, index as u32)
134+
.map_err(|e| {
135+
CubeError::user(format!("Can't get object at array index {}: {}", index, e))
136+
})?
137+
.get::<JsValue, _, _>(&mut self.cx, field_name)
138+
.map_err(|e| {
139+
CubeError::user(format!("Can't get '{}' field value: {}", field_name, e))
140+
})?;
141+
if let Ok(s) = value.downcast::<JsString, _>(&mut self.cx) {
142+
Ok(FieldValue::String(s.value(&mut self.cx)))
143+
} else if let Ok(n) = value.downcast::<JsNumber, _>(&mut self.cx) {
144+
Ok(FieldValue::Number(n.value(&mut self.cx)))
145+
} else if let Ok(b) = value.downcast::<JsBoolean, _>(&mut self.cx) {
146+
Ok(FieldValue::Bool(b.value(&mut self.cx)))
147+
} else if value.downcast::<JsUndefined, _>(&mut self.cx).is_ok()
148+
|| value.downcast::<JsNull, _>(&mut self.cx).is_ok()
149+
{
150+
Ok(FieldValue::Null)
151+
} else {
152+
Err(CubeError::user(format!(
153+
"Expected primitive value but found: {:?}",
154+
value
155+
)))
156+
}
157+
}
158+
}
159+
160+
fn js_stream_push_chunk(mut cx: FunctionContext) -> JsResult<JsUndefined> {
74161
#[cfg(build = "debug")]
75162
trace!("JsWriteStream.push_chunk");
76163

77164
let this = cx
78165
.this()
79166
.downcast_or_throw::<JsBox<JsWriteStream>, _>(&mut cx)?;
80-
let result = cx.argument::<JsString>(0)?;
81-
let result = this.push_chunk(result.value(&mut cx));
82-
83-
Ok(cx.boolean(result))
167+
let chunk_array = cx.argument::<JsArray>(0)?;
168+
let callback = cx.argument::<JsFunction>(1)?.root(&mut cx);
169+
let mut value_object = JsValueObject {
170+
cx,
171+
handle: chunk_array,
172+
};
173+
let value =
174+
transform_response(&mut value_object, this.schema.clone(), &this.member_fields).unwrap();
175+
let future = this.push_chunk(value);
176+
wait_for_future_and_execute_callback(
177+
this.tokio_handle.clone(),
178+
value_object.cx.channel(),
179+
callback,
180+
future,
181+
);
182+
183+
Ok(value_object.cx.undefined())
84184
}
85185

86186
fn js_stream_start(mut cx: FunctionContext) -> JsResult<JsUndefined> {
@@ -102,7 +202,9 @@ fn js_stream_end(mut cx: FunctionContext) -> JsResult<JsUndefined> {
102202
let this = cx
103203
.this()
104204
.downcast_or_throw::<JsBox<JsWriteStream>, _>(&mut cx)?;
105-
this.end();
205+
let future = this.end();
206+
let callback = cx.argument::<JsFunction>(0)?.root(&mut cx);
207+
wait_for_future_and_execute_callback(this.tokio_handle.clone(), cx.channel(), callback, future);
106208

107209
Ok(cx.undefined())
108210
}
@@ -123,16 +225,19 @@ pub async fn call_js_with_stream_as_callback(
123225
channel: Arc<Channel>,
124226
js_method: Arc<Root<JsFunction>>,
125227
query: Option<String>,
228+
schema: SchemaRef,
229+
member_fields: Vec<MemberField>,
126230
) -> Result<Receiver<Chunk>, CubeError> {
127-
let chunk_size = std::env::var("CUBEJS_DB_QUERY_STREAM_HIGH_WATER_MARK")
231+
let channel_size = std::env::var("CUBEJS_DB_QUERY_STREAM_HIGH_WATER_MARK")
128232
.ok()
129233
.map(|v| v.parse::<usize>().unwrap())
130234
.unwrap_or(8192);
131-
let channel_size = 1_000_000 / chunk_size;
132235

133236
let (sender, receiver) = mpsc_channel::<Chunk>(channel_size);
134237
let (ready_sender, ready_receiver) = oneshot::channel();
135238

239+
let tokio_handle = tokio::runtime::Handle::current();
240+
136241
channel
137242
.try_send(move |mut cx| {
138243
// https://github.com/neon-bindings/neon/issues/672
@@ -144,6 +249,9 @@ pub async fn call_js_with_stream_as_callback(
144249
let stream = JsWriteStream {
145250
sender,
146251
ready_sender: Mutex::new(Some(ready_sender)),
252+
tokio_handle,
253+
schema,
254+
member_fields,
147255
};
148256
let this = cx.undefined();
149257
let args: Vec<Handle<_>> = vec![

packages/cubejs-backend-native/src/transport.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use neon::prelude::*;
33

44
use async_trait::async_trait;
55
use cubeclient::models::{V1Error, V1LoadRequestQuery, V1LoadResponse, V1MetaResponse};
6+
use cubesql::compile::engine::df::scan::{MemberField, SchemaRef};
67
use cubesql::{
78
di_service,
89
sql::AuthContextRef,
@@ -175,6 +176,8 @@ impl TransportService for NodeBridgeTransport {
175176
query: V1LoadRequestQuery,
176177
ctx: AuthContextRef,
177178
meta: LoadRequestMeta,
179+
schema: SchemaRef,
180+
member_fields: Vec<MemberField>,
178181
) -> Result<CubeStreamReceiver, CubeError> {
179182
trace!("[transport] Request ->");
180183

@@ -201,11 +204,13 @@ impl TransportService for NodeBridgeTransport {
201204
self.channel.clone(),
202205
self.on_load_stream.clone(),
203206
Some(extra),
207+
schema.clone(),
208+
member_fields.clone(),
204209
)
205210
.await;
206211

207212
if let Err(e) = &res {
208-
if e.message.to_lowercase() == "continue wait" {
213+
if e.message.to_lowercase().contains("continue wait") {
209214
continue;
210215
}
211216
}

packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -601,10 +601,10 @@ export class QueryCache {
601601
.then(([client]) => client.streamQuery(req.query, req.values))
602602
.then((source) => {
603603
const cleanup = (error) => {
604-
if (!source.destroyed) {
604+
if (error && !source.destroyed) {
605605
source.destroy(error);
606606
}
607-
if (!target.destroyed) {
607+
if (error && !target.destroyed) {
608608
target.destroy(error);
609609
}
610610
if (!logged && source.destroyed && target.destroyed) {
@@ -625,13 +625,13 @@ export class QueryCache {
625625
}
626626
};
627627

628-
source.once('end', cleanup);
628+
source.once('end', () => cleanup(undefined));
629629
source.once('error', cleanup);
630-
source.once('close', cleanup);
630+
source.once('close', () => cleanup(undefined));
631631

632-
target.once('end', cleanup);
632+
target.once('end', () => cleanup(undefined));
633633
target.once('error', cleanup);
634-
target.once('close', cleanup);
634+
target.once('close', () => cleanup(undefined));
635635

636636
source.pipe(target);
637637
})

0 commit comments

Comments
 (0)