Skip to content

Commit a2e42ac

Browse files
authored
fix(duckdb-driver): Crash with high concurrency (unsuccessful or closed pending query result) (#7424)
Crash on high concurrency > Attempting to execute an unsuccessful or closed pending query result
1 parent d630243 commit a2e42ac

File tree

1 file changed

+36
-24
lines changed

1 file changed

+36
-24
lines changed

packages/cubejs-duckdb-driver/src/DuckDBDriver.ts

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ export type DuckDBDriverConfiguration = {
2121
};
2222

2323
type InitPromise = {
24-
connection: Connection,
24+
defaultConnection: Connection,
2525
db: Database;
2626
};
2727

@@ -57,10 +57,12 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {
5757
const token = getEnv('duckdbMotherDuckToken', this.config);
5858

5959
const db = new Database(token ? `md:?motherduck_token=${token}` : ':memory:');
60-
const connection = db.connect();
61-
60+
// Under the hood all methods of Database uses internal default connection, but there is no way to expose it
61+
const defaultConnection = db.connect();
62+
const execAsync: (sql: string, ...params: any[]) => Promise<void> = promisify(defaultConnection.exec).bind(defaultConnection) as any;
63+
6264
try {
63-
await this.handleQuery(connection, 'INSTALL httpfs', []);
65+
await execAsync('INSTALL httpfs');
6466
} catch (e) {
6567
if (this.logger) {
6668
console.error('DuckDB - error on httpfs installation', {
@@ -73,7 +75,7 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {
7375
}
7476

7577
try {
76-
await this.handleQuery(connection, 'LOAD httpfs', []);
78+
await execAsync('LOAD httpfs');
7779
} catch (e) {
7880
if (this.logger) {
7981
console.error('DuckDB - error on loading httpfs', {
@@ -115,7 +117,7 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {
115117
for (const { key, value } of configuration) {
116118
if (value) {
117119
try {
118-
await this.handleQuery(connection, `SET ${key}='${value}'`, []);
120+
await execAsync(`SET ${key}='${value}'`);
119121
} catch (e) {
120122
if (this.logger) {
121123
console.error(`DuckDB - error on configuration, key: ${key}`, {
@@ -128,7 +130,7 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {
128130

129131
if (this.config.initSql) {
130132
try {
131-
await this.handleQuery(connection, this.config.initSql, []);
133+
await execAsync(this.config.initSql);
132134
} catch (e) {
133135
if (this.logger) {
134136
console.error('DuckDB - error on init sql (skipping)', {
@@ -139,7 +141,7 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {
139141
}
140142

141143
return {
142-
connection,
144+
defaultConnection,
143145
db
144146
};
145147
}
@@ -164,14 +166,13 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {
164166
return super.getSchemasQuery();
165167
}
166168

167-
protected async getConnection(): Promise<Connection> {
169+
protected async getInitiatedState(): Promise<InitPromise> {
168170
if (!this.initPromise) {
169171
this.initPromise = this.init();
170172
}
171173

172174
try {
173-
const { connection } = await this.initPromise;
174-
return connection;
175+
return await this.initPromise;
175176
} catch (e) {
176177
this.initPromise = null;
177178

@@ -183,15 +184,11 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {
183184
return DuckDBQuery;
184185
}
185186

186-
protected handleQuery<R>(connection: Connection, query: string, values: unknown[] = [], _options?: QueryOptions): Promise<R[]> {
187-
const executeQuery: (sql: string, ...args: any[]) => Promise<R[]> = promisify(connection.all).bind(connection) as any;
188-
189-
return executeQuery(query, ...values);
190-
}
191-
192187
public async query<R = unknown>(query: string, values: unknown[] = [], _options?: QueryOptions): Promise<R[]> {
193-
const result = await this.handleQuery<R>(await this.getConnection(), query, values, _options);
188+
const { defaultConnection } = await this.getInitiatedState();
189+
const fetchAsync: (sql: string, ...params: any[]) => Promise<R[]> = promisify(defaultConnection.all).bind(defaultConnection) as any;
194190

191+
const result = await fetchAsync(query, ...values);
195192
return result.map((item) => {
196193
transformRow(item);
197194

@@ -204,14 +201,29 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {
204201
values: unknown[],
205202
{ highWaterMark }: StreamOptions
206203
): Promise<StreamTableData> {
207-
const connection = await this.getConnection();
204+
const { db } = await this.getInitiatedState();
208205

209-
const asyncIterator = connection.stream(query, ...(values || []));
210-
const rowStream = stream.Readable.from(asyncIterator, { highWaterMark }).pipe(new HydrationStream());
206+
// new connection, because stream can break with
207+
// Attempting to execute an unsuccessful or closed pending query result
208+
// PreAggregation queue has a concurrency limit, it's why pool is not needed here
209+
const connection = db.connect();
210+
const closeAsync = promisify(connection.close).bind(connection);
211211

212-
return {
213-
rowStream,
214-
};
212+
try {
213+
const asyncIterator = connection.stream(query, ...(values || []));
214+
const rowStream = stream.Readable.from(asyncIterator, { highWaterMark }).pipe(new HydrationStream());
215+
216+
return {
217+
rowStream,
218+
release: async () => {
219+
await closeAsync();
220+
}
221+
};
222+
} catch (e) {
223+
await closeAsync();
224+
225+
throw e;
226+
}
215227
}
216228

217229
public async testConnection(): Promise<void> {

0 commit comments

Comments
 (0)