Skip to content

Commit 54f3e4f

Browse files
committed
feat(FIR-42859): async query execution
1 parent 71bd51c commit 54f3e4f

File tree

12 files changed

+591
-38
lines changed

12 files changed

+591
-38
lines changed

src/connection/base.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@ import { generateUserAgent } from "../common/util";
99
import { CompositeError } from "../common/errors";
1010
import JSONbig from "json-bigint";
1111
import { QueryFormatter } from "../formatter/base";
12+
import { AsyncStatement } from "../statement/async";
1213

1314
const defaultQuerySettings = {
1415
output_format: OutputFormat.COMPACT
1516
};
1617

17-
const defaultResponseSettings = {
18+
export const defaultResponseSettings = {
1819
normalizeData: false
1920
};
2021

@@ -160,6 +161,17 @@ export abstract class Connection {
160161
}
161162
}
162163

164+
abstract executeAsync(
165+
query: string,
166+
executeQueryOptions?: ExecuteQueryOptions
167+
): Promise<AsyncStatement>;
168+
169+
abstract isAsyncQueryRunning(token: string): Promise<boolean>;
170+
171+
abstract isAsyncQuerySuccessful(token: string): Promise<boolean | undefined>;
172+
173+
abstract cancelAsyncQuery(token: string): Promise<void>;
174+
163175
async execute(
164176
query: string,
165177
executeQueryOptions: ExecuteQueryOptions = {}
@@ -220,7 +232,7 @@ export abstract class Connection {
220232
}
221233
}
222234

223-
private async throwErrorIfErrorBody(text: string, response: Response) {
235+
protected async throwErrorIfErrorBody(text: string, response: Response) {
224236
// Hack, but looks like this is a limitation of the fetch API
225237
// In order to read the body here and elesewhere, we need to clone the response
226238
// since body can only be read once

src/connection/connection_v1.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import { ACCOUNT, ACCOUNT_BY_NAME } from "../common/api";
22
import { Connection as BaseConnection } from "./base";
33
import { ResourceManager } from "../service";
4+
import { ExecuteQueryOptions } from "../types";
5+
import { AsyncStatement } from "../statement/async";
46

57
export interface AccountInfo {
68
id: string;
@@ -63,4 +65,32 @@ export class ConnectionV1 extends BaseConnection {
6365
async testConnection() {
6466
await this.execute("select 1");
6567
}
68+
69+
// Async methods
70+
async isAsyncQueryRunning(token: string): Promise<boolean> {
71+
throw new Error(
72+
"Asynchronous query running check is not supported in this Firebolt version."
73+
);
74+
}
75+
76+
async isAsyncQuerySuccessful(token: string): Promise<boolean | undefined> {
77+
throw new Error(
78+
"Asynchronous query success check is not supported in this Firebolt version."
79+
);
80+
}
81+
82+
async cancelAsyncQuery(token: string): Promise<void> {
83+
throw new Error(
84+
"Asynchronous query cancellation is not supported in this Firebolt version."
85+
);
86+
}
87+
88+
async executeAsync(
89+
query: string,
90+
executeQueryOptions?: ExecuteQueryOptions
91+
): Promise<AsyncStatement> {
92+
throw new Error(
93+
"Asynchronous execution is not supported in this Firebolt version."
94+
);
95+
}
6696
}

src/connection/connection_v2.ts

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import { AccountNotFoundError, ApiError } from "../common/errors";
22
import { ACCOUNT_SYSTEM_ENGINE, QUERY_URL } from "../common/api";
33

4-
import { Connection as BaseConnection } from "./base";
4+
import { Connection as BaseConnection, defaultResponseSettings } from "./base";
55
import { Cache, inMemoryCache, noneCache } from "../common/tokenCache";
6+
import { ExecuteQueryOptions } from "../types";
7+
import { AsyncStatement } from "../statement/async";
68

79
export class ConnectionV2 extends BaseConnection {
810
private get account(): string {
@@ -70,6 +72,93 @@ export class ConnectionV2 extends BaseConnection {
7072
return this.engineEndpoint;
7173
}
7274

75+
// Async methods
76+
async executeAsync(
77+
query: string,
78+
executeQueryOptions: ExecuteQueryOptions = {}
79+
): Promise<AsyncStatement> {
80+
const { httpClient } = this.context;
81+
82+
executeQueryOptions.response = {
83+
...defaultResponseSettings,
84+
...(executeQueryOptions.response ?? {})
85+
};
86+
87+
const { parameters, namedParameters } = executeQueryOptions;
88+
89+
let formattedQuery: string;
90+
if (this.queryFormatter.isSetStatement(query)) {
91+
// can't have an async set query
92+
throw new Error("SET statements cannot be executed asynchronously.");
93+
} else {
94+
formattedQuery = this.queryFormatter.formatQuery(
95+
query,
96+
parameters,
97+
namedParameters
98+
);
99+
}
100+
101+
const body = formattedQuery;
102+
const asyncExecuteQueryOptions = {
103+
...executeQueryOptions,
104+
settings: {
105+
...executeQueryOptions.settings,
106+
async: true
107+
}
108+
};
109+
const url = this.getRequestUrl(asyncExecuteQueryOptions);
110+
111+
const request = httpClient.request<Response>("POST", url, {
112+
headers: { "user-agent": this.userAgent },
113+
body,
114+
raw: true
115+
});
116+
117+
const response = await request.ready();
118+
const text = await response.text();
119+
await this.throwErrorIfErrorBody(text, response);
120+
return new AsyncStatement(this.context, {
121+
query: formattedQuery,
122+
text,
123+
executeQueryOptions: asyncExecuteQueryOptions
124+
});
125+
}
126+
127+
private async getAsyncQueryInfo(token: string) {
128+
const query = `CALL fb_GetAsyncStatus('${token}')`;
129+
130+
const statement = await this.execute(query);
131+
const { data, meta } = await statement.fetchResult();
132+
const result: Record<string, any> = {};
133+
if (data.length > 0) {
134+
meta.forEach((field, index) => {
135+
result[field.name] = data[0][index];
136+
});
137+
} else {
138+
throw new Error("No data returned from fb_GetAsyncStatus");
139+
}
140+
return result;
141+
}
142+
143+
async isAsyncQueryRunning(token: string): Promise<boolean> {
144+
const info = await this.getAsyncQueryInfo(token);
145+
return info["status"] === "RUNNING";
146+
}
147+
148+
async isAsyncQuerySuccessful(token: string): Promise<boolean | undefined> {
149+
const info = await this.getAsyncQueryInfo(token);
150+
if (info["status"] === "RUNNING") {
151+
return undefined;
152+
}
153+
return info["status"] === "ENDED_SUCCESSFULLY";
154+
}
155+
156+
async cancelAsyncQuery(token: string): Promise<void> {
157+
const info = await this.getAsyncQueryInfo(token);
158+
const async_query_id = info["query_id"];
159+
this.execute(`CANCEL QUERY WHERE query_id='${async_query_id}'`);
160+
}
161+
73162
async testConnection() {
74163
const settings = { internal: [{ auto_start_stop_control: "ignore" }] };
75164
await this.execute("select 1", { settings });

src/http/node.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ const DEFAULT_ERROR = "Server error";
2626
const DEFAULT_USER_AGENT = systemInfoString();
2727

2828
const PROTOCOL_VERSION_HEADER = "Firebolt-Protocol-Version";
29-
const PROTOCOL_VERSION = "2.1";
29+
const PROTOCOL_VERSION = "2.3";
3030
const createSocket = HttpsAgent.prototype.createSocket;
3131

3232
const agentOptions = {

src/statement/async.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import JSONbig from "json-bigint";
2+
import {
3+
ExecuteQueryOptions,
4+
StreamOptions,
5+
Context,
6+
Statistics
7+
} from "../types";
8+
import { Meta } from "../meta";
9+
import { BaseStatement } from "./baseStatement";
10+
11+
export class AsyncStatement extends BaseStatement {
12+
private asyncToken: string;
13+
14+
constructor(
15+
context: Context,
16+
{
17+
query,
18+
text,
19+
executeQueryOptions
20+
}: {
21+
query: string;
22+
text: string;
23+
executeQueryOptions: ExecuteQueryOptions;
24+
}
25+
) {
26+
super(context, { query, text, executeQueryOptions });
27+
this.asyncToken = this.parseResponse(this.text).token;
28+
}
29+
30+
private parseResponse(response: string) {
31+
const parsed = JSONbig.parse(response);
32+
const { token, message, monitorSql } = parsed;
33+
return {
34+
token,
35+
message,
36+
monitorSql
37+
};
38+
}
39+
get async_query_token(): string {
40+
return this.asyncToken;
41+
}
42+
43+
async streamResult(options?: StreamOptions) {
44+
throw new Error("Method not allowed for async statements");
45+
}
46+
47+
async fetchResult(): Promise<{
48+
data: any;
49+
meta: Meta[];
50+
statistics: Statistics | null;
51+
}> {
52+
throw new Error("Method not allowed for async statements");
53+
}
54+
}

src/statement/baseStatement.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import {
2+
ExecuteQueryOptions,
3+
Context,
4+
StreamOptions,
5+
Statistics
6+
} from "../types";
7+
import { Meta } from "../meta";
8+
9+
export abstract class BaseStatement {
10+
protected context: Context;
11+
protected query: string;
12+
protected executeQueryOptions: ExecuteQueryOptions;
13+
protected text: string;
14+
15+
constructor(
16+
context: Context,
17+
{
18+
query,
19+
text,
20+
executeQueryOptions
21+
}: {
22+
query: string;
23+
text: string;
24+
executeQueryOptions: ExecuteQueryOptions;
25+
}
26+
) {
27+
this.context = context;
28+
this.text = text;
29+
this.query = query;
30+
this.executeQueryOptions = executeQueryOptions;
31+
}
32+
33+
abstract streamResult(options?: StreamOptions): Promise<any>;
34+
abstract fetchResult(): Promise<{
35+
data: any;
36+
meta: Meta[];
37+
statistics: Statistics | null;
38+
}>;
39+
}

src/statement/index.ts

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,11 @@ import {
88
import { Meta } from "../meta";
99
import { isDataQuery } from "../common/util";
1010
import { RowStream } from "./stream/rowStream";
11-
import { JSONStream } from "./stream/jsonStream";
12-
import {
13-
normalizeResponse,
14-
getNormalizedStatistics
15-
} from "./normalizeResponse";
11+
import { normalizeResponse } from "./normalizeResponse";
1612
import { CompositeError } from "../common/errors";
13+
import { BaseStatement } from "./baseStatement";
1714

18-
export class Statement {
19-
private context: Context;
20-
private query: string;
21-
private executeQueryOptions: ExecuteQueryOptions;
22-
23-
private text;
15+
export class Statement extends BaseStatement {
2416
private rowStream: RowStream;
2517

2618
constructor(
@@ -35,10 +27,7 @@ export class Statement {
3527
executeQueryOptions: ExecuteQueryOptions;
3628
}
3729
) {
38-
this.context = context;
39-
this.text = text;
40-
this.query = query;
41-
this.executeQueryOptions = executeQueryOptions;
30+
super(context, { query, text, executeQueryOptions });
4231
this.rowStream = new RowStream();
4332
}
4433

src/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ export type QuerySettings = Record<
3535
> & {
3636
output_format?: OutputFormat;
3737
internal?: Record<string, string | number>[];
38+
async?: boolean;
3839
};
3940

4041
export type RowParser = (row: string, isLastRow: boolean) => any;

0 commit comments

Comments
 (0)