Skip to content

Commit 5186d30

Browse files
fix(cubesql): Replace stream buffering with async implementation (#6127)
* refactor(cubesql): Stream's buffer -> mpsc channel * Integrate with upstream query streaming fixes * Revert shim changes and fix tests * Fix tests and clippy --------- Co-authored-by: Pavel Tiunov <[email protected]>
1 parent 315a76e commit 5186d30

File tree

14 files changed

+194
-239
lines changed

14 files changed

+194
-239
lines changed

packages/cubejs-api-gateway/src/gateway.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1550,7 +1550,7 @@ class ApiGateway {
15501550
}> {
15511551
const requestStarted = new Date();
15521552
try {
1553-
this.log({ type: 'Streaming Query', query }, context);
1553+
this.log({ type: 'Load Request', query, streaming: true }, context);
15541554
const [, normalizedQueries] = await this.getNormalizedQueries(query, context, true);
15551555
const sqlQuery = (await this.getSqlQueriesInternal(context, normalizedQueries))[0];
15561556
const q = {
@@ -1572,13 +1572,14 @@ class ApiGateway {
15721572
};
15731573
return _stream;
15741574
} catch (e) {
1575+
// TODO handle error log
15751576
this.log({
15761577
type: 'Streaming Error',
15771578
query,
15781579
error: (<Error>e).message,
15791580
duration: this.duration(requestStarted),
15801581
}, context);
1581-
return null;
1582+
throw e;
15821583
}
15831584
}
15841585

packages/cubejs-backend-native/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/cubejs-backend-native/js/index.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,14 @@ function wrapNativeFunctionWithStream(
127127
let streamResponse: any;
128128
try {
129129
streamResponse = await fn(JSON.parse(extra));
130-
if (streamResponse) {
130+
if (streamResponse && streamResponse.stream) {
131+
writer.start();
131132
let chunk: object[] = [];
132133
streamResponse.stream.on('data', (c: object) => {
133134
chunk.push(c);
134135
if (chunk.length >= chunkLength) {
135136
if (!writer.chunk(JSON.stringify(chunk))) {
137+
// TODO replace with actual stream and high watermark implementation
136138
streamResponse.stream.destroy({
137139
stack: "Rejected by client"
138140
});
@@ -147,14 +149,16 @@ function wrapNativeFunctionWithStream(
147149
writer.end("");
148150
});
149151
streamResponse.stream.on('error', (err: any) => {
150-
writer.reject(err.message || "Unknown JS exception");
152+
writer.reject(err.message || err.toString());
151153
});
154+
} else {
155+
throw new Error(`Expected stream but nothing returned`);
152156
}
153157
} catch (e: any) {
154158
if (!!streamResponse && !!streamResponse.stream) {
155159
streamResponse.stream.destroy(e);
156160
}
157-
writer.reject(e.message || "Unknown JS exception");
161+
writer.reject(e.message || e.toString());
158162
}
159163
};
160164
};
Lines changed: 55 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,82 +1,20 @@
1-
use std::sync::Arc;
1+
use std::sync::{Arc, Mutex};
22

3-
use cubesql::{transport::CubeReadStream, CubeError};
3+
use cubesql::CubeError;
44
#[cfg(build = "debug")]
55
use log::trace;
66
use neon::prelude::*;
77

88
use crate::utils::bind_method;
99

10-
use std::sync::{Condvar, Mutex};
10+
use tokio::sync::mpsc::{channel as mpsc_channel, Receiver, Sender};
11+
use tokio::sync::oneshot;
1112

12-
type JsWriter = Arc<Buffer>;
13-
type BufferChunk = Result<Option<String>, CubeError>;
14-
15-
#[derive(Debug)]
16-
struct Buffer {
17-
data: Mutex<Vec<BufferChunk>>,
18-
data_cv: Condvar,
19-
rejected: Mutex<bool>,
20-
}
21-
22-
impl Buffer {
23-
fn new() -> Self {
24-
Self {
25-
data: Mutex::new(vec![]),
26-
data_cv: Condvar::new(),
27-
rejected: Mutex::new(false),
28-
}
29-
}
30-
31-
fn push(&self, chunk: BufferChunk) -> bool {
32-
if *self.rejected.lock().expect("Can't lock") {
33-
return false;
34-
}
35-
36-
let mut lock = self.data.lock().expect("Can't lock");
37-
// TODO: check size
38-
while lock.len() >= 100 {
39-
lock = self.data_cv.wait(lock).expect("Can't wait");
40-
}
41-
lock.push(chunk);
42-
self.data_cv.notify_one();
43-
44-
true
45-
}
46-
47-
fn release(&self, err: String) {
48-
let mut lock = self.rejected.lock().expect("Can't lock");
49-
if *lock {
50-
return;
51-
}
52-
53-
*lock = true;
54-
55-
let mut lock = self.data.lock().expect("Can't lock");
56-
*lock = vec![Err(CubeError::user(err))];
57-
self.data_cv.notify_one();
58-
}
59-
}
60-
61-
impl CubeReadStream for Buffer {
62-
fn poll_next(&self) -> BufferChunk {
63-
let mut lock = self.data.lock().expect("Can't lock");
64-
while lock.is_empty() {
65-
lock = self.data_cv.wait(lock).expect("Can't wait");
66-
}
67-
let chunk = lock.drain(0..1).last().unwrap();
68-
self.data_cv.notify_one();
69-
70-
chunk
71-
}
72-
73-
fn reject(&self) {
74-
self.release("rejected".to_string());
75-
}
76-
}
13+
type Chunk = Result<String, CubeError>;
7714

7815
pub struct JsWriteStream {
79-
writer: JsWriter,
16+
sender: Sender<Chunk>,
17+
ready_sender: Mutex<Option<oneshot::Sender<Result<(), CubeError>>>>,
8018
}
8119

8220
impl Finalize for JsWriteStream {}
@@ -92,6 +30,10 @@ impl JsWriteStream {
9230
let chunk = bind_method(cx, chunk_fn, obj_this)?;
9331
obj.set(cx, "chunk", chunk)?;
9432

33+
let start_fn = JsFunction::new(cx, js_stream_start)?;
34+
let start_stream = bind_method(cx, start_fn, obj_this)?;
35+
obj.set(cx, "start", start_stream)?;
36+
9537
let end_fn = JsFunction::new(cx, js_stream_end)?;
9638
let end_stream = bind_method(cx, end_fn, obj_this)?;
9739
obj.set(cx, "end", end_stream)?;
@@ -104,15 +46,27 @@ impl JsWriteStream {
10446
}
10547

10648
fn push_chunk(&self, chunk: String) -> bool {
107-
self.writer.push(Ok(Some(chunk)))
49+
match self.sender.try_send(Ok(chunk)) {
50+
Err(_) => false,
51+
Ok(_) => true,
52+
}
53+
}
54+
55+
fn start(&self) {
56+
if let Some(ready_sender) = self.ready_sender.lock().unwrap().take() {
57+
let _ = ready_sender.send(Ok(()));
58+
}
10859
}
10960

11061
fn end(&self) {
111-
self.writer.push(Ok(None));
62+
self.push_chunk("".to_string());
11263
}
11364

11465
fn reject(&self, err: String) {
115-
self.writer.release(err);
66+
if let Some(ready_sender) = self.ready_sender.lock().unwrap().take() {
67+
let _ = ready_sender.send(Err(CubeError::internal(err.to_string())));
68+
}
69+
let _ = self.sender.try_send(Err(CubeError::internal(err)));
11670
}
11771
}
11872

@@ -129,6 +83,18 @@ fn js_stream_push_chunk(mut cx: FunctionContext) -> JsResult<JsBoolean> {
12983
Ok(cx.boolean(result))
13084
}
13185

86+
fn js_stream_start(mut cx: FunctionContext) -> JsResult<JsUndefined> {
87+
#[cfg(build = "debug")]
88+
trace!("JsWriteStream.start");
89+
90+
let this = cx
91+
.this()
92+
.downcast_or_throw::<JsBox<JsWriteStream>, _>(&mut cx)?;
93+
this.start();
94+
95+
Ok(cx.undefined())
96+
}
97+
13298
fn js_stream_end(mut cx: FunctionContext) -> JsResult<JsUndefined> {
13399
#[cfg(build = "debug")]
134100
trace!("JsWriteStream.end");
@@ -150,18 +116,22 @@ fn js_stream_reject(mut cx: FunctionContext) -> JsResult<JsUndefined> {
150116
.downcast_or_throw::<JsBox<JsWriteStream>, _>(&mut cx)?;
151117
let result = cx.argument::<JsString>(0)?;
152118
this.reject(result.value(&mut cx));
153-
154119
Ok(cx.undefined())
155120
}
156121

157-
pub fn call_js_with_stream_as_callback(
122+
pub async fn call_js_with_stream_as_callback(
158123
channel: Arc<Channel>,
159124
js_method: Arc<Root<JsFunction>>,
160125
query: Option<String>,
161-
) -> Result<Arc<dyn CubeReadStream>, CubeError> {
162-
let channel = channel;
163-
let buffer = Arc::new(Buffer::new());
164-
let writer = buffer.clone();
126+
) -> Result<Receiver<Chunk>, CubeError> {
127+
let chunk_size = std::env::var("CUBEJS_DB_QUERY_STREAM_HIGH_WATER_MARK")
128+
.ok()
129+
.map(|v| v.parse::<usize>().unwrap())
130+
.unwrap_or(8192);
131+
let channel_size = 1_000_000 / chunk_size;
132+
133+
let (sender, receiver) = mpsc_channel::<Chunk>(channel_size);
134+
let (ready_sender, ready_receiver) = oneshot::channel();
165135

166136
channel
167137
.try_send(move |mut cx| {
@@ -171,8 +141,10 @@ pub fn call_js_with_stream_as_callback(
171141
Err(v) => v.as_ref().to_inner(&mut cx),
172142
};
173143

174-
let stream = JsWriteStream { writer };
175-
144+
let stream = JsWriteStream {
145+
sender,
146+
ready_sender: Mutex::new(Some(ready_sender)),
147+
};
176148
let this = cx.undefined();
177149
let args: Vec<Handle<_>> = vec![
178150
if let Some(q) = query {
@@ -190,5 +162,7 @@ pub fn call_js_with_stream_as_callback(
190162
CubeError::internal(format!("Unable to send js call via channel, err: {}", err))
191163
})?;
192164

193-
Ok(buffer)
165+
ready_receiver.await??;
166+
167+
Ok(receiver)
194168
}

packages/cubejs-backend-native/src/transport.rs

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use cubeclient::models::{V1Error, V1LoadRequestQuery, V1LoadResponse, V1MetaResp
66
use cubesql::{
77
di_service,
88
sql::AuthContextRef,
9-
transport::{CubeReadStream, LoadRequestMeta, MetaContext, TransportService},
9+
transport::{CubeStreamReceiver, LoadRequestMeta, MetaContext, TransportService},
1010
CubeError,
1111
};
1212
use serde_derive::Serialize;
@@ -164,37 +164,48 @@ impl TransportService for NodeBridgeTransport {
164164
}
165165
}
166166

167-
fn load_stream(
167+
async fn load_stream(
168168
&self,
169169
query: V1LoadRequestQuery,
170170
ctx: AuthContextRef,
171171
meta: LoadRequestMeta,
172-
) -> Result<Arc<dyn CubeReadStream>, CubeError> {
172+
) -> Result<CubeStreamReceiver, CubeError> {
173173
trace!("[transport] Request ->");
174174

175-
let native_auth = ctx
176-
.as_any()
177-
.downcast_ref::<NativeAuthContext>()
178-
.expect("Unable to cast AuthContext to NativeAuthContext");
175+
loop {
176+
let native_auth = ctx
177+
.as_any()
178+
.downcast_ref::<NativeAuthContext>()
179+
.expect("Unable to cast AuthContext to NativeAuthContext");
179180

180-
let request_id = Uuid::new_v4().to_string();
181-
let extra = serde_json::to_string(&LoadRequest {
182-
request: TransportRequest {
183-
id: format!("{}-span-{}", request_id, 1),
184-
meta: Some(meta),
185-
},
186-
query,
187-
session: SessionContext {
188-
user: native_auth.user.clone(),
189-
superuser: native_auth.superuser,
190-
},
191-
})?;
181+
let request_id = Uuid::new_v4().to_string();
182+
let extra = serde_json::to_string(&LoadRequest {
183+
request: TransportRequest {
184+
id: format!("{}-span-{}", request_id, 1),
185+
meta: Some(meta.clone()),
186+
},
187+
query: query.clone(),
188+
session: SessionContext {
189+
user: native_auth.user.clone(),
190+
superuser: native_auth.superuser,
191+
},
192+
})?;
192193

193-
call_js_with_stream_as_callback(
194-
self.channel.clone(),
195-
self.on_load_stream.clone(),
196-
Some(extra),
197-
)
194+
let res = call_js_with_stream_as_callback(
195+
self.channel.clone(),
196+
self.on_load_stream.clone(),
197+
Some(extra),
198+
)
199+
.await;
200+
201+
if let Err(e) = &res {
202+
if e.message.to_lowercase() == "continue wait" {
203+
continue;
204+
}
205+
}
206+
207+
break res;
208+
}
198209
}
199210
}
200211

packages/cubejs-jdbc-driver/src/JDBCDriver.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,9 +259,13 @@ export class JDBCDriver extends BaseDriver {
259259
) => {
260260
if (err) reject(err);
261261
const rowsStream = new QueryStream(res.rows.next);
262+
let connectionReleased = false;
262263
const cleanup = (e?: Error) => {
263-
if (!rowsStream.destroyed) {
264+
if (!connectionReleased) {
264265
this.pool.release(conn);
266+
connectionReleased = true;
267+
}
268+
if (!rowsStream.destroyed) {
265269
rowsStream.destroy(e);
266270
}
267271
};

packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,7 @@ export class QueryCache {
597597
let logged = false;
598598
Promise
599599
.all([clientFactory()])
600+
// TODO use stream method instead
600601
.then(([client]) => client.streamQuery(req.query, req.values))
601602
.then((source) => {
602603
const cleanup = (error) => {
@@ -636,7 +637,7 @@ export class QueryCache {
636637
})
637638
.catch((reason) => {
638639
target.emit('error', reason);
639-
throw reason;
640+
resolve(reason);
640641
});
641642
}));
642643
},

0 commit comments

Comments
 (0)