Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 73 additions & 46 deletions packages/cubejs-backend-native/src/node_export.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use cubesql::compile::parser::parse_sql_to_statement;
use cubesql::compile::{convert_statement_to_cube_query, get_df_batches};
use cubesql::config::processing_loop::ShutdownMode;
use cubesql::sql::dataframe::{arrow_to_column_type, Column};
use cubesql::sql::ColumnFlags;
use cubesql::transport::{SpanId, TransportService};
use futures::StreamExt;

Expand Down Expand Up @@ -192,6 +194,32 @@ fn shutdown_interface(mut cx: FunctionContext) -> JsResult<JsPromise> {

const CHUNK_DELIM: &str = "\n";

async fn write_jsonl_message(
channel: Arc<Channel>,
write_fn: Arc<Root<JsFunction>>,
stream: Arc<Root<JsObject>>,
value: serde_json::Value,
) -> Result<bool, CubeError> {
let message = format!("{}{}", serde_json::to_string(&value)?, CHUNK_DELIM);

call_js_fn(
channel,
write_fn,
Box::new(move |cx| {
let arg = cx.string(message).upcast::<JsValue>();
Ok(vec![arg.upcast::<JsValue>()])
}),
Box::new(|cx, v| match v.downcast_or_throw::<JsBoolean, _>(cx) {
Ok(v) => Ok(v.value(cx)),
Err(_) => Err(CubeError::internal(
"Failed to downcast write response".to_string(),
)),
}),
stream,
)
.await
}

async fn handle_sql_query(
services: Arc<NodeCubeServices>,
native_auth_ctx: Arc<NativeSQLAuthContext>,
Expand Down Expand Up @@ -262,59 +290,44 @@ async fn handle_sql_query(

drain_handler.handle(stream_methods.on.clone()).await?;

let mut is_first_batch = true;
while let Some(batch) = stream.next().await {
let (columns, data) = batch_to_rows(batch?)?;
// Get schema from stream and convert to DataFrame columns format
let stream_schema = stream.schema();
let mut columns = Vec::with_capacity(stream_schema.fields().len());
for field in stream_schema.fields().iter() {
columns.push(Column::new(
field.name().clone(),
arrow_to_column_type(field.data_type().clone())?,
ColumnFlags::empty(),
));
}

if is_first_batch {
let mut schema = Map::new();
schema.insert("schema".into(), columns);
let columns = format!(
"{}{}",
serde_json::to_string(&serde_json::Value::Object(schema))?,
CHUNK_DELIM
);
is_first_batch = false;
// Send schema first
let columns_json = serde_json::to_value(&columns)?;
let mut schema_response = Map::new();
schema_response.insert("schema".into(), columns_json);

call_js_fn(
channel.clone(),
stream_methods.write.clone(),
Box::new(|cx| {
let arg = cx.string(columns).upcast::<JsValue>();
write_jsonl_message(
channel.clone(),
stream_methods.write.clone(),
stream_methods.stream.clone(),
serde_json::Value::Object(schema_response),
)
.await?;

Ok(vec![arg.upcast::<JsValue>()])
}),
Box::new(|cx, v| match v.downcast_or_throw::<JsBoolean, _>(cx) {
Ok(v) => Ok(v.value(cx)),
Err(_) => Err(CubeError::internal(
"Failed to downcast write response".to_string(),
)),
}),
stream_methods.stream.clone(),
)
.await?;
}
// Process all batches
let mut has_data = false;
while let Some(batch) = stream.next().await {
let (_, data) = batch_to_rows(batch?)?;
has_data = true;

let mut rows = Map::new();
rows.insert("data".into(), serde_json::Value::Array(data));
let data = format!("{}{}", serde_json::to_string(&rows)?, CHUNK_DELIM);
let js_stream_write_fn = stream_methods.write.clone();

let should_pause = !call_js_fn(
let should_pause = !write_jsonl_message(
channel.clone(),
js_stream_write_fn,
Box::new(|cx| {
let arg = cx.string(data).upcast::<JsValue>();

Ok(vec![arg.upcast::<JsValue>()])
}),
Box::new(|cx, v| match v.downcast_or_throw::<JsBoolean, _>(cx) {
Ok(v) => Ok(v.value(cx)),
Err(_) => Err(CubeError::internal(
"Failed to downcast write response".to_string(),
)),
}),
stream_methods.write.clone(),
stream_methods.stream.clone(),
serde_json::Value::Object(rows),
)
.await?;

Expand All @@ -324,6 +337,20 @@ async fn handle_sql_query(
}
}

// If no data was processed, send empty data
if !has_data {
let mut rows = Map::new();
rows.insert("data".into(), serde_json::Value::Array(vec![]));

write_jsonl_message(
channel.clone(),
stream_methods.write.clone(),
stream_methods.stream.clone(),
serde_json::Value::Object(rows),
)
.await?;
}

Ok::<(), CubeError>(())
};

Expand Down Expand Up @@ -465,13 +492,13 @@ fn exec_sql(mut cx: FunctionContext) -> JsResult<JsValue> {
Err(err) => {
let mut error_response = Map::new();
error_response.insert("error".into(), err.to_string().into());
let error_response = format!(
let error_message = format!(
"{}{}",
serde_json::to_string(&serde_json::Value::Object(error_response))
.expect("Failed to serialize error response to JSON"),
CHUNK_DELIM
);
let arg = cx.string(error_response).upcast::<JsValue>();
let arg = cx.string(error_message).upcast::<JsValue>();

vec![arg]
}
Expand Down
70 changes: 70 additions & 0 deletions packages/cubejs-backend-native/test/sql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -372,4 +372,74 @@ describe('SQLInterface', () => {
expect(process.env.CUBESQL_STREAM_MODE).toBeFalsy();
}
});

test('schema from stream and empty data when no batches', async () => {
const interfaceMethods_ = interfaceMethods();
const instance = await native.registerInterface({
...interfaceMethods_,
canSwitchUserForSession: (_payload) => true,
});

let schemaReceived = false;
let dataReceived = false;
let emptyDataReceived = false;
let buf = '';

const write = jest.fn((chunk, _, callback) => {
const lines = (buf + chunk.toString('utf-8')).split('\n');
buf = lines.pop() || '';

lines
.filter((it) => it.trim().length)
.forEach((line) => {
const json = JSON.parse(line);

if (json.error) {
// Ignore errors for this test
return;
}

if (json.schema) {
schemaReceived = true;
expect(json.schema).toBeDefined();
expect(Array.isArray(json.schema)).toBe(true);
expect(json.data).toBeUndefined();
} else if (json.data) {
dataReceived = true;
// Check if it's empty data
if (Array.isArray(json.data) && json.data.length === 0) {
emptyDataReceived = true;
}
}
});

callback();
});

const cubeSqlStream = new Writable({
write,
});

try {
// Use LIMIT 0 to test the real case where SQL produces no results
await native.execSql(
instance,
'SELECT order_date FROM KibanaSampleDataEcommerce LIMIT 0;',
cubeSqlStream
);

// Verify schema was sent and empty data was sent for LIMIT 0 query
expect(schemaReceived).toBe(true);
expect(dataReceived).toBe(true);
expect(emptyDataReceived).toBe(true);
} catch (error) {
// Even if query fails, we should get schema
console.log('Query error (expected in test):', error);
if (schemaReceived) {
expect(schemaReceived).toBe(true);
}
}

await native.shutdownInterface(instance, 'fast');
});
});
70 changes: 70 additions & 0 deletions packages/cubejs-testing/test/smoke-cubesql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,76 @@ describe('SQL API', () => {
expect(rows).toBe(ROWS_LIMIT);
});

it('streams schema and empty data with LIMIT 0', async () => {
const response = await fetch(`${birdbox.configuration.apiUrl}/cubesql`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: token,
},
body: JSON.stringify({
query: `SELECT orderDate FROM ECommerce LIMIT 0;`,
}),
});

const reader = response.body;
let isFirstChunk = true;
let schemaReceived = false;
let emptyDataReceived = false;

let data = '';
const execute = () => new Promise<void>((resolve, reject) => {
const onData = jest.fn((chunk: Buffer) => {
const chunkStr = chunk.toString('utf-8');

if (isFirstChunk) {
isFirstChunk = false;
const json = JSON.parse(chunkStr);
expect(json.schema).toEqual([
{
name: 'orderDate',
column_type: 'Timestamp',
},
]);
schemaReceived = true;
} else {
data += chunkStr;
const json = JSON.parse(chunkStr);
if (json.data && Array.isArray(json.data) && json.data.length === 0) {
emptyDataReceived = true;
}
}
});
reader.on('data', onData);

const onError = jest.fn(() => reject(new Error('Stream error')));
reader.on('error', onError);

const onEnd = jest.fn(() => {
resolve();
});

reader.on('end', onEnd);
});

await execute();

// Verify schema was sent first
expect(schemaReceived).toBe(true);

// Verify empty data was sent
expect(emptyDataReceived).toBe(true);

// Verify no actual rows were returned
const dataLines = data.split('\n').filter((it) => it.trim());
if (dataLines.length > 0) {
const rows = dataLines
.map((it) => JSON.parse(it).data?.length || 0)
.reduce((a, b) => a + b, 0);
expect(rows).toBe(0);
}
});

describe('sql4sql', () => {
async function generateSql(query: string, disablePostPprocessing: boolean = false) {
const response = await fetch(`${birdbox.configuration.apiUrl}/sql`, {
Expand Down
Loading