Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bindings/nodejs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ doc = false
[dependencies]
chrono = { workspace = true }
databend-driver = { workspace = true, features = ["rustls", "flight-sql"] }
env_logger = "0.11.8"
tokio-stream = { workspace = true }

napi = { version = "2.16", default-features = false, features = [
Expand Down
3 changes: 1 addition & 2 deletions bindings/nodejs/generated.js
Original file line number Diff line number Diff line change
Expand Up @@ -326,14 +326,13 @@ if (!nativeBinding) {
throw new Error(`Failed to load native binding`)
}

const { ValueOptions, Client, Connection, ConnectionInfo, Schema, Field, RowIterator, RowIteratorExt, RowOrStats, Row, ServerStats } = nativeBinding
const { ValueOptions, Client, Connection, ConnectionInfo, Schema, RowIterator, RowIteratorExt, RowOrStats, Row, ServerStats } = nativeBinding

module.exports.ValueOptions = ValueOptions
module.exports.Client = Client
module.exports.Connection = Connection
module.exports.ConnectionInfo = ConnectionInfo
module.exports.Schema = Schema
module.exports.Field = Field
module.exports.RowIterator = RowIterator
module.exports.RowIteratorExt = RowIteratorExt
module.exports.RowOrStats = RowOrStats
Expand Down
12 changes: 8 additions & 4 deletions bindings/nodejs/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

/* auto-generated by NAPI-RS */

export interface Field {
name: string
dataType: string
}
export declare class ValueOptions {
variantAsObject: boolean
}
Expand Down Expand Up @@ -58,6 +62,8 @@ export declare class Connection {
* The SQL can be `INSERT INTO tbl VALUES` or `REPLACE INTO tbl VALUES`.
*/
loadFile(sql: string, file: string, method?: string | undefined | null): Promise<ServerStats>
/** Close the Connection and release resources. */
close(): Promise<void>
}
export declare class ConnectionInfo {
get handler(): string
Expand All @@ -70,13 +76,10 @@ export declare class ConnectionInfo {
export declare class Schema {
fields(): Array<Field>
}
export declare class Field {
get name(): string
get dataType(): string
}
export declare class RowIterator {
/** Get Schema for rows. */
schema(): Schema
close(): void
/**
* Fetch next row.
* Returns `None` if there are no more rows.
Expand All @@ -90,6 +93,7 @@ export declare class RowIterator {
}
export declare class RowIteratorExt {
schema(): Schema
close(): void
/**
* Fetch next row or stats.
* Returns `None` if there are no more rows.
Expand Down
57 changes: 49 additions & 8 deletions bindings/nodejs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use databend_driver::LoadMethod;
use napi::{bindgen_prelude::*, Env};
use once_cell::sync::Lazy;
use std::str::FromStr;
use std::sync::Arc;
use std::{collections::HashMap, path::Path};
use tokio_stream::StreamExt;

Expand Down Expand Up @@ -75,15 +76,18 @@ impl Client {

#[napi]
pub struct Connection {
inner: databend_driver::Connection,
inner: Arc<databend_driver::Connection>,
opts: ValueOptions,
}

pub type Params = serde_json::Value;

impl Connection {
pub fn new(inner: databend_driver::Connection, opts: ValueOptions) -> Self {
Self { inner, opts }
Self {
inner: Arc::new(inner),
opts,
}
}
}

Expand Down Expand Up @@ -169,7 +173,11 @@ impl Connection {
self.inner.query_iter(&sql).await
};
let iterator = iterator.map_err(format_napi_error)?;
Ok(RowIterator::new(iterator, self.opts.clone()))
Ok(RowIterator::new(
iterator,
self.opts.clone(),
self.inner.clone(),
))
}

/// Execute a SQL query, and return all rows with schema and stats.
Expand All @@ -185,7 +193,11 @@ impl Connection {
self.inner.query_iter_ext(&sql).await
};
let iterator = iterator.map_err(format_napi_error)?;
Ok(RowIteratorExt::new(iterator, self.opts.clone()))
Ok(RowIteratorExt::new(
iterator,
self.opts.clone(),
self.inner.clone(),
))
}

/// Load data with stage attachment.
Expand Down Expand Up @@ -415,11 +427,16 @@ pub struct Field {
pub struct RowIterator {
inner: databend_driver::RowIterator,
opts: ValueOptions,
_conn: Arc<databend_driver::Connection>,
}

impl RowIterator {
pub fn new(inner: databend_driver::RowIterator, opts: ValueOptions) -> Self {
Self { inner, opts }
pub fn new(
inner: databend_driver::RowIterator,
opts: ValueOptions,
_conn: Arc<databend_driver::Connection>,
) -> Self {
Self { inner, opts, _conn }
}
}

Expand All @@ -431,6 +448,12 @@ impl RowIterator {
Schema(self.inner.schema().clone())
}

#[napi]
#[allow(clippy::missing_safety_doc)]
pub unsafe fn close(&mut self) {
self.inner.close()
}

/// Fetch next row.
/// Returns `None` if there are no more rows.
#[napi]
Expand All @@ -454,11 +477,16 @@ impl RowIterator {
pub struct RowIteratorExt {
inner: databend_driver::RowStatsIterator,
opts: ValueOptions,
_conn: Arc<databend_driver::Connection>,
}

impl RowIteratorExt {
pub fn new(inner: databend_driver::RowStatsIterator, opts: ValueOptions) -> Self {
Self { inner, opts }
pub fn new(
inner: databend_driver::RowStatsIterator,
opts: ValueOptions,
_conn: Arc<databend_driver::Connection>,
) -> Self {
Self { inner, opts, _conn }
}
}

Expand All @@ -469,6 +497,12 @@ impl RowIteratorExt {
Schema(self.inner.schema().clone())
}

#[napi]
#[allow(clippy::missing_safety_doc)]
pub unsafe fn close(&mut self) {
self.inner.close()
}

/// Fetch next row or stats.
/// Returns `None` if there are no more rows.
#[napi]
Expand Down Expand Up @@ -608,3 +642,10 @@ impl ServerStats {
fn format_napi_error(err: databend_driver::Error) -> Error {
Error::from_reason(format!("{err}"))
}

#[ctor]
fn init_logger() {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn"))
.target(env_logger::Target::Stdout)
.init();
}
91 changes: 89 additions & 2 deletions bindings/nodejs/tests/binding.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,25 @@ const { finished, pipeline } = require("node:stream/promises");

const assert = require("assert");
const { Given, When, Then } = require("@cucumber/cucumber");
const util = require("util");
const sleep = util.promisify(setTimeout);

let DB_VERSION = process.env.DB_VERSION;
if (DB_VERSION) {
DB_VERSION = DB_VERSION.split(".").map(Number);
} else {
DB_VERSION = [100, 0, 0];
}

let DRIVER_VERSION = process.env.DRIVER_VERSION;
if (DRIVER_VERSION) {
DRIVER_VERSION = DRIVER_VERSION.split(".").map(Number);
} else {
DRIVER_VERSION = [100, 0, 0];
}

process.env.DATABEND_DRIVER_HEARTBEAT_INTERVAL_SECONDS = "1";
process.env.RUST_LOG = "warn,databend_driver=debug,databend_client=debug";

const { Client } = require("../index.js");

Expand Down Expand Up @@ -356,7 +375,7 @@ Then("Load file with Streaming and Select should be equal", async function () {
assert.deepEqual(ret, expected);
});

Then("Temp table should work with cluster", async function () {
Then("Temp table is cleaned up when conn is dropped", async function () {
await this.conn.exec(`create or replace temp table temp_1(a int)`);
await this.conn.exec(`INSERT INTO temp_1 VALUES (1),(2)`);

Expand Down Expand Up @@ -418,6 +437,74 @@ Then("killQuery should return error for non-existent query ID", async function (
// Should get an error for non-existent query
assert.ok(err instanceof Error, "Should throw an Error object");
assert.ok(typeof err.message === "string" && err.message.length > 0, "Should return meaningful error message");
console.log("Expected error for non-existent query:", err.message);
// console.log("Expected error for non-existent query:", err.message);
}
});

Then("Query should not timeout", { timeout: 30000 }, async function () {
if (!(DRIVER_VERSION > [0, 30, 3] && DB_VERSION >= [1, 2, 709])) {
console.log("SKIP");
return;
}
const page_size = 10000;

const dsn = `databend://root:@localhost:8000/?sslmode=disable&wait_time_secs=3&max_rows_per_page=${page_size.toString()}`;
const client = new Client(dsn);

const conn = await client.getConn();
await conn.exec("set http_handler_result_timeout_secs=3");
const row = await conn.queryRow("show settings like 'http_handler_result_timeout_secs'");
assert.equal(row.values()[1], "3");

const sql = "select * from numbers(1000000000)";
const rows = await conn.queryIter(sql);

for (let i = 0; i < page_size; i++) {
const row = await rows.next();
assert.notEqual(row, null, "Row should not be null before sleep");
}

console.log("before sleep");
await sleep(10000);
console.log("after sleep");

for (let i = 0; i < page_size * 10; i++) {
const row = await rows.next();
assert.notEqual(row, null, `Row should not be null after sleep ${i.toString()} ${row}`);
}
//await conn.close();
});

Then("Drop result set should close it", async function () {
if (DRIVER_VERSION < [0, 30, 3]) {
console.log("SKIP");
return;
}
const dbName = "drop_result_set_js";
const n = (1n << 50n) + 1n;

const conn = await this.client.getConn();

await conn.exec(`create or replace database ${dbName}`);
await conn.exec(`use ${dbName}`);

const sql = `select * from numbers(${n.toString()})`;
let rows = await conn.queryIter(sql);

const firstRow = await rows.next();
assert.notEqual(firstRow, null);

await new Promise((resolve) => setTimeout(resolve, 1000));

const processSql = `select count(1) from system.processes where database = '${dbName}'`;
const processResult = await this.conn.queryRow(processSql);
assert.equal(processResult.values()[0], 1, `processResult = ${processResult.values()}`);

rows.close();

await new Promise((resolve) => setTimeout(resolve, 1000));

const finalResult = await this.conn.queryRow(processSql);
const finalCount = finalResult.values()[0];
assert.equal(finalCount, 0, `processResult = ${finalCount}`);
});
1 change: 1 addition & 0 deletions bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ databend-driver-core = { workspace = true }
tokio-stream = { workspace = true }

ctor = "0.2"
env_logger = "0.11.8"
http = "1.0"
once_cell = "1.21"
pyo3 = { version = "0.24.2", features = ["abi3-py37", "chrono"] }
Expand Down
1 change: 1 addition & 0 deletions bindings/python/package/databend_driver/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class Row:

class RowIterator:
def schema(self) -> Schema: ...
def close(self) -> None: ...
def __iter__(self) -> RowIterator: ...
def __next__(self) -> Row: ...
def __aiter__(self) -> RowIterator: ...
Expand Down
10 changes: 9 additions & 1 deletion bindings/python/src/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;
use crate::types::{ConnectionInfo, DriverError, Row, RowIterator, ServerStats, VERSION};
use crate::utils::{options_as_ref, to_sql_params, wait_for_future};
use databend_driver::{LoadMethod, SchemaRef};
use pyo3::exceptions::{PyAttributeError, PyStopIteration};
use pyo3::exceptions::{PyAttributeError, PyException, PyStopIteration};
use pyo3::types::{PyList, PyTuple};
use pyo3::{prelude::*, IntoPyObjectExt};
use tokio::sync::Mutex;
Expand Down Expand Up @@ -251,6 +251,7 @@ pub struct BlockingDatabendCursor {
// buffer is used to store only the first row after execute
buffer: Vec<Row>,
schema: Option<SchemaRef>,
closed: bool,
}

impl BlockingDatabendCursor {
Expand All @@ -260,6 +261,7 @@ impl BlockingDatabendCursor {
rows: None,
buffer: Vec::new(),
schema: None,
closed: false,
}
}
}
Expand Down Expand Up @@ -313,6 +315,7 @@ impl BlockingDatabendCursor {
}

pub fn close(&mut self, py: Python) -> PyResult<()> {
self.closed = true;
self.reset();
wait_for_future(py, async move {
self.conn.close().await.map_err(DriverError::new)
Expand All @@ -330,6 +333,11 @@ impl BlockingDatabendCursor {
params: Option<Bound<'p, PyAny>>,
values: Option<Bound<'p, PyAny>>,
) -> PyResult<PyObject> {
if self.closed {
return Err(PyException::new_err(
"BlockingDatabendCursor already closed",
));
}
if let Some(values) = values {
return self.executemany(py, operation, [values].to_vec());
}
Expand Down
4 changes: 4 additions & 0 deletions bindings/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ fn _databend_driver(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<RowIterator>()?;
m.add_class::<ServerStats>()?;

env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn"))
.target(env_logger::Target::Stdout)
.init();

// Register PEP-249 compliant exceptions
exceptions::register_exceptions(py, m)?;

Expand Down
8 changes: 8 additions & 0 deletions bindings/python/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ impl RowIterator {
Ok(Schema::new(ret))
}

pub fn close(&self, py: Python) -> PyResult<()> {
let streamer = self.0.clone();
wait_for_future(py, async move {
streamer.lock().await.close();
});
Ok(())
}

fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf
}
Expand Down
Loading
Loading