diff --git a/README.md b/README.md
index ce875a6b..d27ea862 100644
--- a/README.md
+++ b/README.md
@@ -78,6 +78,10 @@ console.log(rows)
* Fetch result
* Stream result
* Result hydration
+ * Server-side async queries
+ * Execute Async Query
+ * Check Async Query Status
+ * Cancel Async Query
* Engine management
* getByName
* Engine
@@ -340,6 +344,44 @@ firebolt-sdk maps SQL data types to their corresponding JavaScript equivalents.
| | STRING | String | |
| Date & Time | DATE | Date | |
+
+## Server-side async query execution
+
+Firebolt supports server-side asynchronous query execution. This feature allows you to run
+queries in the background and fetch the results later. This is especially useful for long-running
+queries that you don't want to wait for or maintain a persistent connection to the server.
+
+
+### Execute Async Query
+
+Executes a query asynchronously. This is useful for long-running queries that you don't want to block the main thread. The resulting statement does not contain data and should only be used to receive an async query token. Token can be saved elsewhere and reused, even on a new connection to check on this query.
+
+```typescript
+const statement = await connection.executeAsync(query, executeQueryOptions);
+const token = statement.asyncQueryToken; // used to check query status and cancel it
+// statement.fetchResult() -- not allowed as there's no result to fetch
+```
+
+
+### Check Async Query Status
+
+Checks the status of an asynchronous query. Use this to determine if the query is still running or has completed. `isAsyncQueryRunning` woudl return true or false if the query is running or has finished. `isAsyncQuerySuccessful` would return true if the query has completed successfully, false if it has failed and `undefined` if the query is still running.
+
+```typescript
+const token = statement.asyncQueryToken; // can only be fetched for async query
+const isRunning = await connection.isAsyncQueryRunning(token);
+const isSuccessful = await connection.isAsyncQuerySuccessful(token);
+```
+
+
+### Cancel Async Query
+
+Cancels a running asynchronous query. Use this if you need to stop a long-running query, if its execution is no longer needed.
+
+```typescript
+const token = statement.asyncQueryToken; // can only be fetched for async query
+await connection.cancelAsyncQuery(token);
+```
### Engine management
diff --git a/src/connection/base.ts b/src/connection/base.ts
index 99b17a8d..bb2132f5 100644
--- a/src/connection/base.ts
+++ b/src/connection/base.ts
@@ -9,12 +9,13 @@ import { generateUserAgent } from "../common/util";
import { CompositeError } from "../common/errors";
import JSONbig from "json-bigint";
import { QueryFormatter } from "../formatter/base";
+import { AsyncStatement } from "../statement/async";
const defaultQuerySettings = {
output_format: OutputFormat.COMPACT
};
-const defaultResponseSettings = {
+export const defaultResponseSettings = {
normalizeData: false
};
@@ -160,10 +161,21 @@ export abstract class Connection {
}
}
- async execute(
+ abstract executeAsync(
query: string,
- executeQueryOptions: ExecuteQueryOptions = {}
- ): Promise {
+ executeQueryOptions?: ExecuteQueryOptions
+ ): Promise;
+
+ abstract isAsyncQueryRunning(token: string): Promise;
+
+ abstract isAsyncQuerySuccessful(token: string): Promise;
+
+ abstract cancelAsyncQuery(token: string): Promise;
+
+ protected async prepareAndExecuteQuery(
+ query: string,
+ executeQueryOptions: ExecuteQueryOptions
+ ): Promise<{ formattedQuery: string; response: Response }> {
const { httpClient } = this.context;
executeQueryOptions.response = {
@@ -201,14 +213,8 @@ export abstract class Connection {
try {
const response = await request.ready();
- const text = await response.text();
await this.processHeaders(response.headers);
- await this.throwErrorIfErrorBody(text, response);
- return new Statement(this.context, {
- query: formattedQuery,
- text,
- executeQueryOptions
- });
+ return { formattedQuery, response };
} catch (error) {
// In case it was a set query, remove set parameter if query fails
if (setKey.length > 0) {
@@ -220,7 +226,25 @@ export abstract class Connection {
}
}
- private async throwErrorIfErrorBody(text: string, response: Response) {
+ async execute(
+ query: string,
+ executeQueryOptions: ExecuteQueryOptions = {}
+ ): Promise {
+ const { formattedQuery, response } = await this.prepareAndExecuteQuery(
+ query,
+ executeQueryOptions
+ );
+
+ const text = await response.text();
+ await this.throwErrorIfErrorBody(text, response);
+ return new Statement(this.context, {
+ query: formattedQuery,
+ text,
+ executeQueryOptions
+ });
+ }
+
+ protected async throwErrorIfErrorBody(text: string, response: Response) {
// Hack, but looks like this is a limitation of the fetch API
// In order to read the body here and elesewhere, we need to clone the response
// since body can only be read once
diff --git a/src/connection/connection_v1.ts b/src/connection/connection_v1.ts
index 9aee3687..9083599f 100644
--- a/src/connection/connection_v1.ts
+++ b/src/connection/connection_v1.ts
@@ -1,6 +1,8 @@
import { ACCOUNT, ACCOUNT_BY_NAME } from "../common/api";
import { Connection as BaseConnection } from "./base";
import { ResourceManager } from "../service";
+import { ExecuteQueryOptions } from "../types";
+import { AsyncStatement } from "../statement/async";
export interface AccountInfo {
id: string;
@@ -63,4 +65,32 @@ export class ConnectionV1 extends BaseConnection {
async testConnection() {
await this.execute("select 1");
}
+
+ // Async methods
+ async isAsyncQueryRunning(token: string): Promise {
+ throw new Error(
+ "Asynchronous query running check is not supported in this Firebolt version."
+ );
+ }
+
+ async isAsyncQuerySuccessful(token: string): Promise {
+ throw new Error(
+ "Asynchronous query success check is not supported in this Firebolt version."
+ );
+ }
+
+ async cancelAsyncQuery(token: string): Promise {
+ throw new Error(
+ "Asynchronous query cancellation is not supported in this Firebolt version."
+ );
+ }
+
+ async executeAsync(
+ query: string,
+ executeQueryOptions?: ExecuteQueryOptions
+ ): Promise {
+ throw new Error(
+ "Asynchronous execution is not supported in this Firebolt version."
+ );
+ }
}
diff --git a/src/connection/connection_v2.ts b/src/connection/connection_v2.ts
index d4c1bea7..b996b9c4 100644
--- a/src/connection/connection_v2.ts
+++ b/src/connection/connection_v2.ts
@@ -3,6 +3,8 @@ import { ACCOUNT_SYSTEM_ENGINE, QUERY_URL } from "../common/api";
import { Connection as BaseConnection } from "./base";
import { Cache, inMemoryCache, noneCache } from "../common/tokenCache";
+import { ExecuteQueryOptions } from "../types";
+import { AsyncStatement } from "../statement/async";
export class ConnectionV2 extends BaseConnection {
private get account(): string {
@@ -70,6 +72,72 @@ export class ConnectionV2 extends BaseConnection {
return this.engineEndpoint;
}
+ // Async methods
+ async executeAsync(
+ query: string,
+ executeQueryOptions: ExecuteQueryOptions = {}
+ ): Promise {
+ const asyncExecuteQueryOptions = {
+ ...executeQueryOptions,
+ settings: {
+ ...executeQueryOptions.settings,
+ async: true
+ }
+ };
+
+ if (this.queryFormatter.isSetStatement(query)) {
+ // can't have an async set query
+ throw new Error("SET statements cannot be executed asynchronously.");
+ }
+ const { formattedQuery, response } = await this.prepareAndExecuteQuery(
+ query,
+ asyncExecuteQueryOptions
+ );
+
+ const text = await response.text();
+ await this.throwErrorIfErrorBody(text, response);
+ return new AsyncStatement(this.context, {
+ query: formattedQuery,
+ text,
+ executeQueryOptions: asyncExecuteQueryOptions
+ });
+ }
+
+ private async getAsyncQueryInfo(token: string) {
+ const query = `CALL fb_GetAsyncStatus('${token}')`;
+
+ const statement = await this.execute(query);
+ const { data, meta } = await statement.fetchResult();
+ const result: Record = {};
+ if (data.length > 0) {
+ meta.forEach((field, index) => {
+ result[field.name] = data[0][index];
+ });
+ } else {
+ throw new Error("No data returned from fb_GetAsyncStatus");
+ }
+ return result;
+ }
+
+ async isAsyncQueryRunning(token: string): Promise {
+ const info = await this.getAsyncQueryInfo(token);
+ return info["status"] === "RUNNING";
+ }
+
+ async isAsyncQuerySuccessful(token: string): Promise {
+ const info = await this.getAsyncQueryInfo(token);
+ if (info["status"] === "RUNNING") {
+ return undefined;
+ }
+ return info["status"] === "ENDED_SUCCESSFULLY";
+ }
+
+ async cancelAsyncQuery(token: string): Promise {
+ const info = await this.getAsyncQueryInfo(token);
+ const async_query_id = info["query_id"];
+ this.execute(`CANCEL QUERY WHERE query_id='${async_query_id}'`);
+ }
+
async testConnection() {
const settings = { internal: [{ auto_start_stop_control: "ignore" }] };
await this.execute("select 1", { settings });
diff --git a/src/http/node.ts b/src/http/node.ts
index 735b087e..2905fef3 100644
--- a/src/http/node.ts
+++ b/src/http/node.ts
@@ -26,7 +26,7 @@ const DEFAULT_ERROR = "Server error";
const DEFAULT_USER_AGENT = systemInfoString();
const PROTOCOL_VERSION_HEADER = "Firebolt-Protocol-Version";
-const PROTOCOL_VERSION = "2.1";
+const PROTOCOL_VERSION = "2.3";
const createSocket = HttpsAgent.prototype.createSocket;
const agentOptions = {
diff --git a/src/statement/async.ts b/src/statement/async.ts
new file mode 100644
index 00000000..ab45f105
--- /dev/null
+++ b/src/statement/async.ts
@@ -0,0 +1,60 @@
+import JSONbig from "json-bigint";
+import {
+ ExecuteQueryOptions,
+ StreamOptions,
+ Context,
+ Statistics
+} from "../types";
+import { Meta } from "../meta";
+
+export class AsyncStatement {
+ private readonly asyncToken: string;
+ private readonly context: Context;
+ private readonly query: string;
+ private readonly executeQueryOptions: ExecuteQueryOptions;
+ private readonly text: string;
+
+ constructor(
+ context: Context,
+ {
+ query,
+ text,
+ executeQueryOptions
+ }: {
+ query: string;
+ text: string;
+ executeQueryOptions: ExecuteQueryOptions;
+ }
+ ) {
+ this.context = context;
+ this.text = text;
+ this.query = query;
+ this.executeQueryOptions = executeQueryOptions;
+ this.asyncToken = this.parseResponse(this.text).token;
+ }
+
+ private parseResponse(response: string) {
+ const parsed = JSONbig.parse(response);
+ const { token, message, monitorSql } = parsed;
+ return {
+ token,
+ message,
+ monitorSql
+ };
+ }
+ get asyncQueryToken(): string {
+ return this.asyncToken;
+ }
+
+ async streamResult(options?: StreamOptions) {
+ throw new Error("Method not allowed for async statements");
+ }
+
+ async fetchResult(): Promise<{
+ data: any;
+ meta: Meta[];
+ statistics: Statistics | null;
+ }> {
+ throw new Error("Method not allowed for async statements");
+ }
+}
diff --git a/src/statement/index.ts b/src/statement/index.ts
index 52d59741..428c2f6a 100644
--- a/src/statement/index.ts
+++ b/src/statement/index.ts
@@ -8,20 +8,15 @@ import {
import { Meta } from "../meta";
import { isDataQuery } from "../common/util";
import { RowStream } from "./stream/rowStream";
-import { JSONStream } from "./stream/jsonStream";
-import {
- normalizeResponse,
- getNormalizedStatistics
-} from "./normalizeResponse";
+import { normalizeResponse } from "./normalizeResponse";
import { CompositeError } from "../common/errors";
export class Statement {
+ private rowStream: RowStream;
private context: Context;
private query: string;
private executeQueryOptions: ExecuteQueryOptions;
-
- private text;
- private rowStream: RowStream;
+ private readonly text: string;
constructor(
context: Context,
diff --git a/src/types.ts b/src/types.ts
index 1daaaed1..e8b152ff 100644
--- a/src/types.ts
+++ b/src/types.ts
@@ -35,6 +35,7 @@ export type QuerySettings = Record<
> & {
output_format?: OutputFormat;
internal?: Record[];
+ async?: boolean;
};
export type RowParser = (row: string, isLastRow: boolean) => any;
diff --git a/test/integration/v2/serverSideAsync.test.ts b/test/integration/v2/serverSideAsync.test.ts
new file mode 100644
index 00000000..64974903
--- /dev/null
+++ b/test/integration/v2/serverSideAsync.test.ts
@@ -0,0 +1,128 @@
+import { Firebolt } from "../../../src/index";
+
+const connectionParams = {
+ auth: {
+ client_id: process.env.FIREBOLT_CLIENT_ID as string,
+ client_secret: process.env.FIREBOLT_CLIENT_SECRET as string
+ },
+ account: process.env.FIREBOLT_ACCOUNT as string,
+ database: process.env.FIREBOLT_DATABASE as string,
+ engineName: process.env.FIREBOLT_ENGINE_NAME as string
+};
+
+const testTableName = "test_insert_async";
+const longSelect = "SELECT checksum(*) FROM GENERATE_SERIES(1, 2500000000)"; // approx 3 sec
+jest.setTimeout(50000);
+
+describe("server side async integration test", () => {
+ afterEach(async () => {
+ const firebolt = Firebolt({
+ apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string
+ });
+ const connection = await firebolt.connect(connectionParams);
+ await connection.execute(`DROP TABLE IF EXISTS ${testTableName}`);
+ });
+
+ it("can submit async query", async () => {
+ const firebolt = Firebolt({
+ apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string
+ });
+ const connection = await firebolt.connect(connectionParams);
+ await connection.execute(
+ `CREATE TABLE IF NOT EXISTS ${testTableName} (a long)`
+ );
+
+ const statement = await connection.executeAsync(
+ `INSERT INTO ${testTableName} VALUES (1)`
+ );
+ await expect(statement.fetchResult()).rejects.toThrow();
+ expect(statement.asyncQueryToken).toBeDefined();
+ expect(statement.asyncQueryToken).not.toBe("");
+ });
+
+ it("can check long-running query status", async () => {
+ const firebolt = Firebolt({
+ apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string
+ });
+ const connection = await firebolt.connect(connectionParams);
+ await connection.execute(
+ `CREATE TABLE IF NOT EXISTS ${testTableName} (a long)`
+ );
+
+ const statement = await connection.executeAsync(
+ `INSERT INTO ${testTableName} ${longSelect}`
+ );
+ const token = statement.asyncQueryToken;
+ expect(token).toBeDefined();
+ expect(token).not.toBe("");
+ const isRunning = await connection.isAsyncQueryRunning(token);
+ expect(isRunning).toBe(true);
+ const isSuccessful = await connection.isAsyncQuerySuccessful(token);
+ expect(isSuccessful).not.toBeDefined();
+ await new Promise(resolve => setTimeout(resolve, 6000)); // wait for the query to finish
+ const isRunningAfter = await connection.isAsyncQueryRunning(token);
+ expect(isRunningAfter).toBe(false);
+ const isSuccessfulAfter = await connection.isAsyncQuerySuccessful(token);
+ expect(isSuccessfulAfter).toBe(true);
+ const statement2 = await connection.execute(
+ `SELECT * FROM ${testTableName}`
+ );
+ const { data } = await statement2.fetchResult();
+ expect(data.length).toBe(1);
+ });
+
+ it("can cancel long-running query", async () => {
+ const firebolt = Firebolt({
+ apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string
+ });
+ const connection = await firebolt.connect(connectionParams);
+ await connection.execute(
+ `CREATE TABLE IF NOT EXISTS ${testTableName} (a long)`
+ );
+
+ const statement = await connection.executeAsync(
+ `INSERT INTO ${testTableName} ${longSelect}`
+ );
+ const token = statement.asyncQueryToken;
+ expect(token).toBeDefined();
+ expect(token).not.toBe("");
+ await connection.cancelAsyncQuery(token);
+ await new Promise(resolve => setTimeout(resolve, 200)); // wait for the cancellation to take effect
+ const isRunning = await connection.isAsyncQueryRunning(token);
+ expect(isRunning).toBe(false);
+ const isSuccessful = await connection.isAsyncQuerySuccessful(token);
+ expect(isSuccessful).toBe(false);
+ });
+
+ it("test can check execution from another connection", async () => {
+ const firebolt = Firebolt({
+ apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string
+ });
+ const connection = await firebolt.connect(connectionParams);
+ await connection.execute(
+ `CREATE TABLE IF NOT EXISTS ${testTableName} (a long)`
+ );
+
+ const statement = await connection.executeAsync(
+ `INSERT INTO ${testTableName} ${longSelect}`
+ );
+ const token = statement.asyncQueryToken;
+ expect(token).toBeDefined();
+ expect(token).not.toBe("");
+ const connection2 = await firebolt.connect(connectionParams);
+ const isRunning = await connection2.isAsyncQueryRunning(token);
+ expect(isRunning).toBe(true);
+ const isSuccessful = await connection2.isAsyncQuerySuccessful(token);
+ expect(isSuccessful).not.toBeDefined();
+ await new Promise(resolve => setTimeout(resolve, 3000)); // wait for the query to finish
+ const isRunningAfter = await connection2.isAsyncQueryRunning(token);
+ expect(isRunningAfter).toBe(false);
+ const isSuccessfulAfter = await connection2.isAsyncQuerySuccessful(token);
+ expect(isSuccessfulAfter).toBe(true);
+ const statement2 = await connection2.execute(
+ `SELECT * FROM ${testTableName}`
+ );
+ const { data } = await statement2.fetchResult();
+ expect(data.length).toBe(1);
+ });
+});
diff --git a/test/unit/http.test.ts b/test/unit/http.test.ts
index 53b6421a..6228ae76 100644
--- a/test/unit/http.test.ts
+++ b/test/unit/http.test.ts
@@ -114,7 +114,7 @@ describe.each([
);
server.use(
rest.post(`https://${apiEndpoint}/engines`, (req, res, ctx) => {
- expect(req.headers.get("Firebolt-Protocol-Version")).toEqual("2.1");
+ expect(req.headers.get("Firebolt-Protocol-Version")).toEqual("2.3");
return res(ctx.json({ ok: true }));
})
);
diff --git a/test/unit/v1/connection.test.ts b/test/unit/v1/connection.test.ts
index 7004c4d5..fce4de06 100644
--- a/test/unit/v1/connection.test.ts
+++ b/test/unit/v1/connection.test.ts
@@ -266,4 +266,29 @@ describe("Connection v1", () => {
expect(record.one).toBe(1);
expect(other_account_used).toBe(true);
});
+ it("verify async methods throw an error", async () => {
+ const firebolt = Firebolt({ apiEndpoint });
+
+ const connectionParams: ConnectionOptions = {
+ auth: {
+ username: "user",
+ password: "pass"
+ },
+ database: "dummy",
+ engineName: "dummy",
+ account: accountName
+ };
+
+ const connection = await firebolt.connect(connectionParams);
+ await expect(connection.executeAsync("SELECT 1")).rejects.toThrow();
+ await expect(
+ connection.isAsyncQueryRunning("fake_query_id")
+ ).rejects.toThrow(/.*not supported.*/);
+ await expect(
+ connection.isAsyncQuerySuccessful("fake_query_id")
+ ).rejects.toThrow(/.*not supported.*/);
+ await expect(connection.cancelAsyncQuery("fake_query_id")).rejects.toThrow(
+ /.*not supported.*/
+ );
+ });
});
diff --git a/test/unit/v2/connection.test.ts b/test/unit/v2/connection.test.ts
index 390704d5..58774cf2 100644
--- a/test/unit/v2/connection.test.ts
+++ b/test/unit/v2/connection.test.ts
@@ -1,4 +1,4 @@
-import { setupServer } from "msw/node";
+import { setupServer, SetupServerApi } from "msw/node";
import { rest } from "msw";
import { Firebolt } from "../../../src";
import { ConnectionOptions } from "../../../src/types";
@@ -26,6 +26,35 @@ const engineUrlResponse = {
rows: 1
};
+const asyncQueryResponse = {
+ message: "the query was accepted for async processing",
+ monitorSql: "CALL fb_GetAsyncStatus('async_query_token');",
+ token: "async_query_token"
+};
+
+// Helper function to setup mock server with auth
+const setupMockServer = (server: SetupServerApi) => {
+ server.use(
+ rest.post(`https://id.fake.firebolt.io/oauth/token`, (req, res, ctx) => {
+ return res(
+ ctx.json({
+ access_token: "fake_access_token"
+ })
+ );
+ }),
+ rest.get(
+ `https://api.fake.firebolt.io/web/v3/account/my_account/engineUrl`,
+ (req, res, ctx) => {
+ return res(
+ ctx.json({
+ engineUrl: "https://some_system_engine.com"
+ })
+ );
+ }
+ )
+ );
+};
+
describe("Connection V2", () => {
const server = setupServer();
@@ -167,24 +196,8 @@ describe("Connection V2", () => {
apiEndpoint
});
+ setupMockServer(server);
server.use(
- rest.post(`https://id.fake.firebolt.io/oauth/token`, (req, res, ctx) => {
- return res(
- ctx.json({
- access_token: "fake_access_token"
- })
- );
- }),
- rest.get(
- `https://api.fake.firebolt.io/web/v3/account/my_account/engineUrl`,
- (req, res, ctx) => {
- return res(
- ctx.json({
- engineUrl: "https://some_system_engine.com"
- })
- );
- }
- ),
rest.post(
`https://some_system_engine.com/${QUERY_URL}`,
(req, res, ctx) => {
@@ -258,4 +271,178 @@ describe("Connection V2", () => {
expect(engineUrlCalls).toBe(2);
});
+
+ it("executes async query successfully", async () => {
+ const firebolt = Firebolt({
+ apiEndpoint
+ });
+
+ setupMockServer(server);
+ server.use(
+ rest.post(
+ `https://some_system_engine.com/${QUERY_URL}`,
+ (req, res, ctx) => {
+ return res(ctx.json(asyncQueryResponse));
+ }
+ )
+ );
+
+ const connectionParams: ConnectionOptions = {
+ auth: {
+ client_id: "dummy",
+ client_secret: "dummy"
+ },
+ database: "dummy",
+ account: "my_account"
+ };
+
+ const connection = await firebolt.connect(connectionParams);
+ const asyncStatement = await connection.executeAsync("INSERT 1");
+ expect(asyncStatement).toBeDefined();
+ expect(asyncStatement.asyncQueryToken).toBe("async_query_token");
+ });
+
+ it("throws error for async SET statement", async () => {
+ const firebolt = Firebolt({
+ apiEndpoint
+ });
+ setupMockServer(server);
+
+ const connectionParams: ConnectionOptions = {
+ auth: {
+ client_id: "dummy",
+ client_secret: "dummy"
+ },
+ account: "my_account"
+ };
+
+ const connection = await firebolt.connect(connectionParams);
+ await expect(
+ connection.executeAsync("SET some_setting = 1")
+ ).rejects.toThrow("SET statements cannot be executed asynchronously.");
+ });
+
+ const asyncQueryStatusTest = async (
+ status: string,
+ expectedRunning: boolean,
+ expectedSuccessful: boolean | undefined
+ ) => {
+ const firebolt = Firebolt({
+ apiEndpoint
+ });
+
+ setupMockServer(server);
+ server.use(
+ rest.post(
+ `https://some_system_engine.com/${QUERY_URL}`,
+ async (req, res, ctx) => {
+ let result;
+ const body = await req.text();
+ if (
+ body.includes("fb_GetAsyncStatus") &&
+ body.includes("async_query_token")
+ ) {
+ result = ctx.json({
+ meta: [
+ { name: "status", type: "Text" },
+ { name: "query_id", type: "Text" }
+ ],
+ data: [[status, "query_id_123"]],
+ rows: 1
+ });
+ } else {
+ result = ctx.json(asyncQueryResponse);
+ }
+ return res(result);
+ }
+ )
+ );
+
+ const connectionParams: ConnectionOptions = {
+ auth: {
+ client_id: "dummy",
+ client_secret: "dummy"
+ },
+ database: "dummy",
+ account: "my_account"
+ };
+
+ const connection = await firebolt.connect(connectionParams);
+ const asyncStatement = await connection.executeAsync("INSERT 1");
+ const isRunning = await connection.isAsyncQueryRunning(
+ asyncStatement.asyncQueryToken
+ );
+ expect(isRunning).toBe(expectedRunning);
+ const isSuccessful = await connection.isAsyncQuerySuccessful(
+ asyncStatement.asyncQueryToken
+ );
+ expect(isSuccessful).toBe(expectedSuccessful);
+ };
+
+ it("checks status when async query is successful", async () => {
+ await asyncQueryStatusTest("ENDED_SUCCESSFULLY", false, true);
+ });
+
+ it("checks status when async query is running", async () => {
+ await asyncQueryStatusTest("RUNNING", true, undefined);
+ });
+
+ it("checks status when async query is failed", async () => {
+ await asyncQueryStatusTest("ENDED_WITH_ERROR", false, false);
+ });
+
+ it("cancels async query correctly", async () => {
+ let cancelQueryExecuted = false;
+ const firebolt = Firebolt({
+ apiEndpoint
+ });
+
+ setupMockServer(server);
+ server.use(
+ rest.post(
+ `https://some_system_engine.com/${QUERY_URL}?output_format=JSON_Compact`,
+ async (req, res, ctx) => {
+ const body = await req.text();
+ if (
+ body.includes("fb_GetAsyncStatus") &&
+ body.includes("async_query_token")
+ ) {
+ return res(
+ ctx.json({
+ meta: [
+ { name: "status", type: "Text" },
+ { name: "query_id", type: "Text" }
+ ],
+ data: [["RUNNING", "query_id_123"]],
+ rows: 1
+ })
+ );
+ } else if (
+ body.includes("CANCEL QUERY") &&
+ body.includes("query_id_123")
+ ) {
+ cancelQueryExecuted = true;
+ return res(ctx.json({}));
+ } else {
+ return res(ctx.json(asyncQueryResponse));
+ }
+ }
+ )
+ );
+
+ const connectionParams: ConnectionOptions = {
+ auth: {
+ client_id: "dummy",
+ client_secret: "dummy"
+ },
+ account: "my_account"
+ };
+
+ const connection = await firebolt.connect(connectionParams);
+ const asyncStatement = await connection.executeAsync("INSERT 1");
+ expect(asyncStatement.asyncQueryToken).not.toBe("");
+ await connection.cancelAsyncQuery(asyncStatement.asyncQueryToken);
+ await new Promise(resolve => setTimeout(resolve, 100)); // somehow we need it to wait for the flag switch
+ expect(cancelQueryExecuted).toBe(true);
+ });
});