Skip to content

Commit b41bb04

Browse files
authored
feat: add heartbeat to avoid unexpected timeout. (#697)
* feat: add heartbeat to avoid unexpected timeout. * add unit test for ClientManager.
1 parent fc5689d commit b41bb04

File tree

25 files changed

+870
-94
lines changed

25 files changed

+870
-94
lines changed

bindings/nodejs/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ doc = false
1515
[dependencies]
1616
chrono = { workspace = true }
1717
databend-driver = { workspace = true, features = ["rustls", "flight-sql"] }
18+
env_logger = "0.11.8"
1819
tokio-stream = { workspace = true }
1920

2021
napi = { version = "2.16", default-features = false, features = [

bindings/nodejs/generated.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,14 +326,13 @@ if (!nativeBinding) {
326326
throw new Error(`Failed to load native binding`)
327327
}
328328

329-
const { ValueOptions, Client, Connection, ConnectionInfo, Schema, Field, RowIterator, RowIteratorExt, RowOrStats, Row, ServerStats } = nativeBinding
329+
const { ValueOptions, Client, Connection, ConnectionInfo, Schema, RowIterator, RowIteratorExt, RowOrStats, Row, ServerStats } = nativeBinding
330330

331331
module.exports.ValueOptions = ValueOptions
332332
module.exports.Client = Client
333333
module.exports.Connection = Connection
334334
module.exports.ConnectionInfo = ConnectionInfo
335335
module.exports.Schema = Schema
336-
module.exports.Field = Field
337336
module.exports.RowIterator = RowIterator
338337
module.exports.RowIteratorExt = RowIteratorExt
339338
module.exports.RowOrStats = RowOrStats

bindings/nodejs/index.d.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919

2020
/* auto-generated by NAPI-RS */
2121

22+
export interface Field {
23+
name: string
24+
dataType: string
25+
}
2226
export declare class ValueOptions {
2327
variantAsObject: boolean
2428
}
@@ -58,6 +62,8 @@ export declare class Connection {
5862
* The SQL can be `INSERT INTO tbl VALUES` or `REPLACE INTO tbl VALUES`.
5963
*/
6064
loadFile(sql: string, file: string, method?: string | undefined | null): Promise<ServerStats>
65+
/** Close the Connection and release resources. */
66+
close(): Promise<void>
6167
}
6268
export declare class ConnectionInfo {
6369
get handler(): string
@@ -70,13 +76,10 @@ export declare class ConnectionInfo {
7076
export declare class Schema {
7177
fields(): Array<Field>
7278
}
73-
export declare class Field {
74-
get name(): string
75-
get dataType(): string
76-
}
7779
export declare class RowIterator {
7880
/** Get Schema for rows. */
7981
schema(): Schema
82+
close(): void
8083
/**
8184
* Fetch next row.
8285
* Returns `None` if there are no more rows.
@@ -90,6 +93,7 @@ export declare class RowIterator {
9093
}
9194
export declare class RowIteratorExt {
9295
schema(): Schema
96+
close(): void
9397
/**
9498
* Fetch next row or stats.
9599
* Returns `None` if there are no more rows.

bindings/nodejs/src/lib.rs

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use databend_driver::LoadMethod;
2020
use napi::{bindgen_prelude::*, Env};
2121
use once_cell::sync::Lazy;
2222
use std::str::FromStr;
23+
use std::sync::Arc;
2324
use std::{collections::HashMap, path::Path};
2425
use tokio_stream::StreamExt;
2526

@@ -75,15 +76,18 @@ impl Client {
7576

7677
#[napi]
7778
pub struct Connection {
78-
inner: databend_driver::Connection,
79+
inner: Arc<databend_driver::Connection>,
7980
opts: ValueOptions,
8081
}
8182

8283
pub type Params = serde_json::Value;
8384

8485
impl Connection {
8586
pub fn new(inner: databend_driver::Connection, opts: ValueOptions) -> Self {
86-
Self { inner, opts }
87+
Self {
88+
inner: Arc::new(inner),
89+
opts,
90+
}
8791
}
8892
}
8993

@@ -169,7 +173,11 @@ impl Connection {
169173
self.inner.query_iter(&sql).await
170174
};
171175
let iterator = iterator.map_err(format_napi_error)?;
172-
Ok(RowIterator::new(iterator, self.opts.clone()))
176+
Ok(RowIterator::new(
177+
iterator,
178+
self.opts.clone(),
179+
self.inner.clone(),
180+
))
173181
}
174182

175183
/// Execute a SQL query, and return all rows with schema and stats.
@@ -185,7 +193,11 @@ impl Connection {
185193
self.inner.query_iter_ext(&sql).await
186194
};
187195
let iterator = iterator.map_err(format_napi_error)?;
188-
Ok(RowIteratorExt::new(iterator, self.opts.clone()))
196+
Ok(RowIteratorExt::new(
197+
iterator,
198+
self.opts.clone(),
199+
self.inner.clone(),
200+
))
189201
}
190202

191203
/// Load data with stage attachment.
@@ -415,11 +427,16 @@ pub struct Field {
415427
pub struct RowIterator {
416428
inner: databend_driver::RowIterator,
417429
opts: ValueOptions,
430+
_conn: Arc<databend_driver::Connection>,
418431
}
419432

420433
impl RowIterator {
421-
pub fn new(inner: databend_driver::RowIterator, opts: ValueOptions) -> Self {
422-
Self { inner, opts }
434+
pub fn new(
435+
inner: databend_driver::RowIterator,
436+
opts: ValueOptions,
437+
_conn: Arc<databend_driver::Connection>,
438+
) -> Self {
439+
Self { inner, opts, _conn }
423440
}
424441
}
425442

@@ -431,6 +448,12 @@ impl RowIterator {
431448
Schema(self.inner.schema().clone())
432449
}
433450

451+
#[napi]
452+
#[allow(clippy::missing_safety_doc)]
453+
pub unsafe fn close(&mut self) {
454+
self.inner.close()
455+
}
456+
434457
/// Fetch next row.
435458
/// Returns `None` if there are no more rows.
436459
#[napi]
@@ -454,11 +477,16 @@ impl RowIterator {
454477
pub struct RowIteratorExt {
455478
inner: databend_driver::RowStatsIterator,
456479
opts: ValueOptions,
480+
_conn: Arc<databend_driver::Connection>,
457481
}
458482

459483
impl RowIteratorExt {
460-
pub fn new(inner: databend_driver::RowStatsIterator, opts: ValueOptions) -> Self {
461-
Self { inner, opts }
484+
pub fn new(
485+
inner: databend_driver::RowStatsIterator,
486+
opts: ValueOptions,
487+
_conn: Arc<databend_driver::Connection>,
488+
) -> Self {
489+
Self { inner, opts, _conn }
462490
}
463491
}
464492

@@ -469,6 +497,12 @@ impl RowIteratorExt {
469497
Schema(self.inner.schema().clone())
470498
}
471499

500+
#[napi]
501+
#[allow(clippy::missing_safety_doc)]
502+
pub unsafe fn close(&mut self) {
503+
self.inner.close()
504+
}
505+
472506
/// Fetch next row or stats.
473507
/// Returns `None` if there are no more rows.
474508
#[napi]
@@ -608,3 +642,10 @@ impl ServerStats {
608642
fn format_napi_error(err: databend_driver::Error) -> Error {
609643
Error::from_reason(format!("{err}"))
610644
}
645+
646+
#[ctor]
647+
fn init_logger() {
648+
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn"))
649+
.target(env_logger::Target::Stdout)
650+
.init();
651+
}

bindings/nodejs/tests/binding.js

Lines changed: 89 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,25 @@ const { finished, pipeline } = require("node:stream/promises");
1919

2020
const assert = require("assert");
2121
const { Given, When, Then } = require("@cucumber/cucumber");
22+
const util = require("util");
23+
const sleep = util.promisify(setTimeout);
24+
25+
let DB_VERSION = process.env.DB_VERSION;
26+
if (DB_VERSION) {
27+
DB_VERSION = DB_VERSION.split(".").map(Number);
28+
} else {
29+
DB_VERSION = [100, 0, 0];
30+
}
31+
32+
let DRIVER_VERSION = process.env.DRIVER_VERSION;
33+
if (DRIVER_VERSION) {
34+
DRIVER_VERSION = DRIVER_VERSION.split(".").map(Number);
35+
} else {
36+
DRIVER_VERSION = [100, 0, 0];
37+
}
38+
39+
process.env.DATABEND_DRIVER_HEARTBEAT_INTERVAL_SECONDS = "1";
40+
process.env.RUST_LOG = "warn,databend_driver=debug,databend_client=debug";
2241

2342
const { Client } = require("../index.js");
2443

@@ -356,7 +375,7 @@ Then("Load file with Streaming and Select should be equal", async function () {
356375
assert.deepEqual(ret, expected);
357376
});
358377

359-
Then("Temp table should work with cluster", async function () {
378+
Then("Temp table is cleaned up when conn is dropped", async function () {
360379
await this.conn.exec(`create or replace temp table temp_1(a int)`);
361380
await this.conn.exec(`INSERT INTO temp_1 VALUES (1),(2)`);
362381

@@ -418,6 +437,74 @@ Then("killQuery should return error for non-existent query ID", async function (
418437
// Should get an error for non-existent query
419438
assert.ok(err instanceof Error, "Should throw an Error object");
420439
assert.ok(typeof err.message === "string" && err.message.length > 0, "Should return meaningful error message");
421-
console.log("Expected error for non-existent query:", err.message);
440+
// console.log("Expected error for non-existent query:", err.message);
422441
}
423442
});
443+
444+
Then("Query should not timeout", { timeout: 30000 }, async function () {
445+
if (!(DRIVER_VERSION > [0, 30, 3] && DB_VERSION >= [1, 2, 709])) {
446+
console.log("SKIP");
447+
return;
448+
}
449+
const page_size = 10000;
450+
451+
const dsn = `databend://root:@localhost:8000/?sslmode=disable&wait_time_secs=3&max_rows_per_page=${page_size.toString()}`;
452+
const client = new Client(dsn);
453+
454+
const conn = await client.getConn();
455+
await conn.exec("set http_handler_result_timeout_secs=3");
456+
const row = await conn.queryRow("show settings like 'http_handler_result_timeout_secs'");
457+
assert.equal(row.values()[1], "3");
458+
459+
const sql = "select * from numbers(1000000000)";
460+
const rows = await conn.queryIter(sql);
461+
462+
for (let i = 0; i < page_size; i++) {
463+
const row = await rows.next();
464+
assert.notEqual(row, null, "Row should not be null before sleep");
465+
}
466+
467+
console.log("before sleep");
468+
await sleep(10000);
469+
console.log("after sleep");
470+
471+
for (let i = 0; i < page_size * 10; i++) {
472+
const row = await rows.next();
473+
assert.notEqual(row, null, `Row should not be null after sleep ${i.toString()} ${row}`);
474+
}
475+
//await conn.close();
476+
});
477+
478+
Then("Drop result set should close it", async function () {
479+
if (DRIVER_VERSION < [0, 30, 3]) {
480+
console.log("SKIP");
481+
return;
482+
}
483+
const dbName = "drop_result_set_js";
484+
const n = (1n << 50n) + 1n;
485+
486+
const conn = await this.client.getConn();
487+
488+
await conn.exec(`create or replace database ${dbName}`);
489+
await conn.exec(`use ${dbName}`);
490+
491+
const sql = `select * from numbers(${n.toString()})`;
492+
let rows = await conn.queryIter(sql);
493+
494+
const firstRow = await rows.next();
495+
assert.notEqual(firstRow, null);
496+
497+
await new Promise((resolve) => setTimeout(resolve, 1000));
498+
499+
const processSql = `select count(1) from system.processes where database = '${dbName}'`;
500+
const processResult = await this.conn.queryRow(processSql);
501+
assert.equal(processResult.values()[0], 1, `processResult = ${processResult.values()}`);
502+
503+
rows.close();
504+
505+
await new Promise((resolve) => setTimeout(resolve, 1000));
506+
507+
const finalResult = await this.conn.queryRow(processSql);
508+
const finalCount = finalResult.values()[0];
509+
assert.equal(finalCount, 0, `processResult = ${finalCount}`);
510+
});

bindings/python/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ databend-driver-core = { workspace = true }
2020
tokio-stream = { workspace = true }
2121

2222
ctor = "0.2"
23+
env_logger = "0.11.8"
2324
http = "1.0"
2425
once_cell = "1.21"
2526
pyo3 = { version = "0.24.2", features = ["abi3-py37", "chrono"] }

bindings/python/package/databend_driver/__init__.pyi

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ class Row:
7575

7676
class RowIterator:
7777
def schema(self) -> Schema: ...
78+
def close(self) -> None: ...
7879
def __iter__(self) -> RowIterator: ...
7980
def __next__(self) -> Row: ...
8081
def __aiter__(self) -> RowIterator: ...

bindings/python/src/blocking.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::sync::Arc;
2020
use crate::types::{ConnectionInfo, DriverError, Row, RowIterator, ServerStats, VERSION};
2121
use crate::utils::{options_as_ref, to_sql_params, wait_for_future};
2222
use databend_driver::{LoadMethod, SchemaRef};
23-
use pyo3::exceptions::{PyAttributeError, PyStopIteration};
23+
use pyo3::exceptions::{PyAttributeError, PyException, PyStopIteration};
2424
use pyo3::types::{PyList, PyTuple};
2525
use pyo3::{prelude::*, IntoPyObjectExt};
2626
use tokio::sync::Mutex;
@@ -251,6 +251,7 @@ pub struct BlockingDatabendCursor {
251251
// buffer is used to store only the first row after execute
252252
buffer: Vec<Row>,
253253
schema: Option<SchemaRef>,
254+
closed: bool,
254255
}
255256

256257
impl BlockingDatabendCursor {
@@ -260,6 +261,7 @@ impl BlockingDatabendCursor {
260261
rows: None,
261262
buffer: Vec::new(),
262263
schema: None,
264+
closed: false,
263265
}
264266
}
265267
}
@@ -313,6 +315,7 @@ impl BlockingDatabendCursor {
313315
}
314316

315317
pub fn close(&mut self, py: Python) -> PyResult<()> {
318+
self.closed = true;
316319
self.reset();
317320
wait_for_future(py, async move {
318321
self.conn.close().await.map_err(DriverError::new)
@@ -330,6 +333,11 @@ impl BlockingDatabendCursor {
330333
params: Option<Bound<'p, PyAny>>,
331334
values: Option<Bound<'p, PyAny>>,
332335
) -> PyResult<PyObject> {
336+
if self.closed {
337+
return Err(PyException::new_err(
338+
"BlockingDatabendCursor already closed",
339+
));
340+
}
333341
if let Some(values) = values {
334342
return self.executemany(py, operation, [values].to_vec());
335343
}

bindings/python/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ fn _databend_driver(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
3838
m.add_class::<RowIterator>()?;
3939
m.add_class::<ServerStats>()?;
4040

41+
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn"))
42+
.target(env_logger::Target::Stdout)
43+
.init();
44+
4145
// Register PEP-249 compliant exceptions
4246
exceptions::register_exceptions(py, m)?;
4347

bindings/python/src/types.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,14 @@ impl RowIterator {
255255
Ok(Schema::new(ret))
256256
}
257257

258+
pub fn close(&self, py: Python) -> PyResult<()> {
259+
let streamer = self.0.clone();
260+
wait_for_future(py, async move {
261+
streamer.lock().await.close();
262+
});
263+
Ok(())
264+
}
265+
258266
fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
259267
slf
260268
}

0 commit comments

Comments
 (0)