Skip to content

Commit 0effa90

Browse files
committed
[FIXUP] pass abort signal deeper
1 parent 6e4679f commit 0effa90

File tree

1 file changed

+9
-6
lines changed

1 file changed

+9
-6
lines changed

packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
170170
this.client = this.createClient(maxPoolSize);
171171
}
172172

173-
protected withCancel<T>(fn: (con: ClickHouseClient, queryId: string) => Promise<T>): Promise<T> {
173+
protected withCancel<T>(fn: (con: ClickHouseClient, queryId: string, signal: AbortSignal) => Promise<T>): Promise<T> {
174174
console.log("withCancel call");
175175
const queryId = uuidv4();
176176

@@ -180,12 +180,10 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
180180
const promise = (async () => {
181181
await this.client.ping();
182182
signal.throwIfAborted();
183-
// TODO pass signal deeper, new driver supports abort signal, but does not do autokill
184-
const result = await fn(this.client, queryId);
183+
const result = await fn(this.client, queryId, signal);
185184
signal.throwIfAborted();
186185
return result;
187186
})();
188-
// TODO why do we need this?
189187
(promise as any).cancel = async () => {
190188
abortController.abort();
191189
// Use separate client for kill query, usual pool may be busy
@@ -269,6 +267,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
269267
query_id: queryId,
270268
format: 'JSON',
271269
clickhouse_settings: this.config.clickhouseSettings,
270+
abort_signal: signal,
272271
});
273272
console.log("queryResponse resultSet", query, resultSet.query_id, resultSet.response_headers);
274273

@@ -652,20 +651,24 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
652651

653652
// This is not part of a driver interface, and marked public only for testing
654653
public async command(query: string): Promise<void> {
655-
await this.withCancel(async (connection) => {
654+
await this.withCancel(async (connection, queryId, signal) => {
656655
await connection.command({
657656
query,
657+
query_id: queryId,
658+
abort_signal: signal,
658659
});
659660
});
660661
}
661662

662663
// This is not part of a driver interface, and marked public only for testing
663664
public async insert(table: string, values: Array<Array<unknown>>): Promise<void> {
664-
await this.withCancel(async (connection) => {
665+
await this.withCancel(async (connection, queryId, signal) => {
665666
await connection.insert({
666667
table,
667668
values,
668669
format: 'JSONCompactEachRow',
670+
query_id: queryId,
671+
abort_signal: signal,
669672
});
670673
});
671674
}

0 commit comments

Comments
 (0)