Skip to content

Commit 3426e6e

Browse files
authored
fix(jdbc-driver): streamQuery connection release (#6108)
1 parent d9d4205 commit 3426e6e

File tree

1 file changed

+30
-42
lines changed

1 file changed

+30
-42
lines changed

packages/cubejs-jdbc-driver/src/JDBCDriver.ts

Lines changed: 30 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -225,58 +225,46 @@ export class JDBCDriver extends BaseDriver {
225225
}
226226

227227
public async streamQuery(sql: string, values: string[]): Promise<Readable> {
228+
const conn = await this.pool.acquire();
228229
const query = applyParams(sql, values);
229230
const cancelObj: {cancel?: Function} = {};
230231
try {
231-
const conn = await this.pool.acquire();
232-
try {
233-
const createStatement = promisify(conn.createStatement.bind(conn));
234-
const statement = await createStatement();
232+
const createStatement = promisify(conn.createStatement.bind(conn));
233+
const statement = await createStatement();
235234

236-
// TODO (buntarb): this does not make any sense...
237-
// statement.setFetchSize(
238-
// getEnv('dbQueryStreamHighWaterMark'),
239-
// (err: unknown) => { if (err) console.error(err); }
240-
// );
241-
242-
if (cancelObj) {
243-
cancelObj.cancel = promisify(statement.cancel.bind(statement));
244-
}
235+
if (cancelObj) {
236+
cancelObj.cancel = promisify(statement.cancel.bind(statement));
237+
}
245238

246-
// TODO (buntarb): timeout decision needs.
247-
// const setQueryTimeout = promisify(statement.setQueryTimeout.bind(statement));
248-
// await setQueryTimeout(600);
249-
250-
const executeQuery = promisify(statement.execute.bind(statement));
251-
const resultSet = await executeQuery(query);
252-
return new Promise((resolve, reject) => {
253-
resultSet.toObjectIter(
254-
(
255-
err: unknown,
256-
res: {
239+
const executeQuery = promisify(statement.execute.bind(statement));
240+
const resultSet = await executeQuery(query);
241+
return new Promise((resolve, reject) => {
242+
resultSet.toObjectIter(
243+
(
244+
err: unknown,
245+
res: {
257246
labels: string[],
258247
types: number[],
259248
rows: { next: nextFn },
260249
},
261-
) => {
262-
if (err) reject(err);
263-
const rowsStream = new QueryStream(res.rows.next);
264-
const cleanup = (e?: Error) => {
265-
if (!rowsStream.destroyed) {
266-
rowsStream.destroy(e);
267-
}
268-
};
269-
rowsStream.once('end', cleanup);
270-
rowsStream.once('error', cleanup);
271-
rowsStream.once('close', cleanup);
272-
resolve(rowsStream);
273-
}
274-
);
275-
});
276-
} finally {
277-
await this.pool.release(conn);
278-
}
250+
) => {
251+
if (err) reject(err);
252+
const rowsStream = new QueryStream(res.rows.next);
253+
const cleanup = (e?: Error) => {
254+
if (!rowsStream.destroyed) {
255+
this.pool.release(conn);
256+
rowsStream.destroy(e);
257+
}
258+
};
259+
rowsStream.once('end', cleanup);
260+
rowsStream.once('error', cleanup);
261+
rowsStream.once('close', cleanup);
262+
resolve(rowsStream);
263+
}
264+
);
265+
});
279266
} catch (ex: any) {
267+
await this.pool.release(conn);
280268
if (ex.cause) {
281269
throw new Error(ex.cause.getMessageSync());
282270
} else {

0 commit comments

Comments
 (0)