Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ console.log(rows)
* <a href="#fetch-result">Fetch result</a>
* <a href="#stream-result">Stream result</a>
* <a href="#result-hydration">Result hydration</a>
* <a href="#server-side-async-query-execution">Server-side async queries</a>
* <a href="#execute-async-query">Execute Async Query</a>
* <a href="#check-async-query-status">Check Async Query Status</a>
* <a href="#cancel-async-query">Cancel Async Query</a>
* <a href="#engine-management">Engine management</a>
* <a href="#getbyname">getByName</a>
* <a href="#engine">Engine</a>
Expand Down Expand Up @@ -340,6 +344,44 @@ firebolt-sdk maps SQL data types to their corresponding JavaScript equivalents.
| | STRING | String | |
| Date & Time | DATE | Date | |

<a id="Server-side async queries"></a>
## Server-side async query execution

Firebolt supports server-side asynchronous query execution. This feature allows you to run
queries in the background and fetch the results later. This is especially useful for long-running
queries that you don't want to wait for or maintain a persistent connection to the server.

<a id="Execute Async Query"></a>
### Execute Async Query

Executes a query asynchronously. This is useful for long-running queries that you don't want to block the main thread. The resulting statement does not contain data and should only be used to receive an async query token. Token can be saved elsewhere and reused, even on a new connection to check on this query.

```typescript
const statement = await connection.executeAsync(query, executeQueryOptions);
const token = statement.asyncQueryToken; // used to check query status and cancel it
// statement.fetchResult() -- not allowed as there's no result to fetch
```

<a id="Check Async Query Status"></a>
### Check Async Query Status

Checks the status of an asynchronous query. Use this to determine if the query is still running or has completed. `isAsyncQueryRunning` woudl return true or false if the query is running or has finished. `isAsyncQuerySuccessful` would return true if the query has completed successfully, false if it has failed and `undefined` if the query is still running.

```typescript
const token = statement.asyncQueryToken; // can only be fetched for async query
const isRunning = await connection.isAsyncQueryRunning(token);
const isSuccessful = await connection.isAsyncQuerySuccessful(token);
```

<a id="Cancel Async Query"></a>
### Cancel Async Query

Cancels a running asynchronous query. Use this if you need to stop a long-running query, if its execution is no longer needed.

```typescript
const token = statement.asyncQueryToken; // can only be fetched for async query
await connection.cancelAsyncQuery(token);
```

<a id="engine-management"></a>
### Engine management
Expand Down
48 changes: 36 additions & 12 deletions src/connection/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import { generateUserAgent } from "../common/util";
import { CompositeError } from "../common/errors";
import JSONbig from "json-bigint";
import { QueryFormatter } from "../formatter/base";
import { AsyncStatement } from "../statement/async";

const defaultQuerySettings = {
output_format: OutputFormat.COMPACT
};

const defaultResponseSettings = {
export const defaultResponseSettings = {
normalizeData: false
};

Expand Down Expand Up @@ -160,10 +161,21 @@ export abstract class Connection {
}
}

async execute(
abstract executeAsync(
query: string,
executeQueryOptions: ExecuteQueryOptions = {}
): Promise<Statement> {
executeQueryOptions?: ExecuteQueryOptions
): Promise<AsyncStatement>;

abstract isAsyncQueryRunning(token: string): Promise<boolean>;

abstract isAsyncQuerySuccessful(token: string): Promise<boolean | undefined>;

abstract cancelAsyncQuery(token: string): Promise<void>;

protected async prepareAndExecuteQuery(
query: string,
executeQueryOptions: ExecuteQueryOptions
): Promise<{ formattedQuery: string; response: Response }> {
const { httpClient } = this.context;

executeQueryOptions.response = {
Expand Down Expand Up @@ -201,14 +213,8 @@ export abstract class Connection {

try {
const response = await request.ready();
const text = await response.text();
await this.processHeaders(response.headers);
await this.throwErrorIfErrorBody(text, response);
return new Statement(this.context, {
query: formattedQuery,
text,
executeQueryOptions
});
return { formattedQuery, response };
} catch (error) {
// In case it was a set query, remove set parameter if query fails
if (setKey.length > 0) {
Expand All @@ -220,7 +226,25 @@ export abstract class Connection {
}
}

private async throwErrorIfErrorBody(text: string, response: Response) {
async execute(
query: string,
executeQueryOptions: ExecuteQueryOptions = {}
): Promise<Statement> {
const { formattedQuery, response } = await this.prepareAndExecuteQuery(
query,
executeQueryOptions
);

const text = await response.text();
await this.throwErrorIfErrorBody(text, response);
return new Statement(this.context, {
query: formattedQuery,
text,
executeQueryOptions
});
}

protected async throwErrorIfErrorBody(text: string, response: Response) {
// Hack, but looks like this is a limitation of the fetch API
// In order to read the body here and elesewhere, we need to clone the response
// since body can only be read once
Expand Down
30 changes: 30 additions & 0 deletions src/connection/connection_v1.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { ACCOUNT, ACCOUNT_BY_NAME } from "../common/api";
import { Connection as BaseConnection } from "./base";
import { ResourceManager } from "../service";
import { ExecuteQueryOptions } from "../types";
import { AsyncStatement } from "../statement/async";

export interface AccountInfo {
id: string;
Expand Down Expand Up @@ -63,4 +65,32 @@ export class ConnectionV1 extends BaseConnection {
async testConnection() {
await this.execute("select 1");
}

// Async methods
async isAsyncQueryRunning(token: string): Promise<boolean> {
throw new Error(
"Asynchronous query running check is not supported in this Firebolt version."
);
}

async isAsyncQuerySuccessful(token: string): Promise<boolean | undefined> {
throw new Error(
"Asynchronous query success check is not supported in this Firebolt version."
);
}

async cancelAsyncQuery(token: string): Promise<void> {
throw new Error(
"Asynchronous query cancellation is not supported in this Firebolt version."
);
}

async executeAsync(
query: string,
executeQueryOptions?: ExecuteQueryOptions
): Promise<AsyncStatement> {
throw new Error(
"Asynchronous execution is not supported in this Firebolt version."
);
}
}
68 changes: 68 additions & 0 deletions src/connection/connection_v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { ACCOUNT_SYSTEM_ENGINE, QUERY_URL } from "../common/api";

import { Connection as BaseConnection } from "./base";
import { Cache, inMemoryCache, noneCache } from "../common/tokenCache";
import { ExecuteQueryOptions } from "../types";
import { AsyncStatement } from "../statement/async";

export class ConnectionV2 extends BaseConnection {
private get account(): string {
Expand Down Expand Up @@ -70,6 +72,72 @@ export class ConnectionV2 extends BaseConnection {
return this.engineEndpoint;
}

// Async methods
async executeAsync(
query: string,
executeQueryOptions: ExecuteQueryOptions = {}
): Promise<AsyncStatement> {
const asyncExecuteQueryOptions = {
...executeQueryOptions,
settings: {
...executeQueryOptions.settings,
async: true
}
};

if (this.queryFormatter.isSetStatement(query)) {
// can't have an async set query
throw new Error("SET statements cannot be executed asynchronously.");
}
const { formattedQuery, response } = await this.prepareAndExecuteQuery(
query,
asyncExecuteQueryOptions
);

const text = await response.text();
await this.throwErrorIfErrorBody(text, response);
return new AsyncStatement(this.context, {
query: formattedQuery,
text,
executeQueryOptions: asyncExecuteQueryOptions
});
}

private async getAsyncQueryInfo(token: string) {
const query = `CALL fb_GetAsyncStatus('${token}')`;

const statement = await this.execute(query);
const { data, meta } = await statement.fetchResult();
const result: Record<string, any> = {};
if (data.length > 0) {
meta.forEach((field, index) => {
result[field.name] = data[0][index];
});
} else {
throw new Error("No data returned from fb_GetAsyncStatus");
}
return result;
}

async isAsyncQueryRunning(token: string): Promise<boolean> {
const info = await this.getAsyncQueryInfo(token);
return info["status"] === "RUNNING";
}

async isAsyncQuerySuccessful(token: string): Promise<boolean | undefined> {
const info = await this.getAsyncQueryInfo(token);
if (info["status"] === "RUNNING") {
return undefined;
}
return info["status"] === "ENDED_SUCCESSFULLY";
}

async cancelAsyncQuery(token: string): Promise<void> {
const info = await this.getAsyncQueryInfo(token);
const async_query_id = info["query_id"];
this.execute(`CANCEL QUERY WHERE query_id='${async_query_id}'`);
}

async testConnection() {
const settings = { internal: [{ auto_start_stop_control: "ignore" }] };
await this.execute("select 1", { settings });
Expand Down
2 changes: 1 addition & 1 deletion src/http/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const DEFAULT_ERROR = "Server error";
const DEFAULT_USER_AGENT = systemInfoString();

const PROTOCOL_VERSION_HEADER = "Firebolt-Protocol-Version";
const PROTOCOL_VERSION = "2.1";
const PROTOCOL_VERSION = "2.3";
const createSocket = HttpsAgent.prototype.createSocket;

const agentOptions = {
Expand Down
60 changes: 60 additions & 0 deletions src/statement/async.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import JSONbig from "json-bigint";
import {
ExecuteQueryOptions,
StreamOptions,
Context,
Statistics
} from "../types";
import { Meta } from "../meta";

export class AsyncStatement {
private readonly asyncToken: string;
private readonly context: Context;
private readonly query: string;
private readonly executeQueryOptions: ExecuteQueryOptions;
private readonly text: string;

constructor(
context: Context,
{
query,
text,
executeQueryOptions
}: {
query: string;
text: string;
executeQueryOptions: ExecuteQueryOptions;
}
) {
this.context = context;
this.text = text;
this.query = query;
this.executeQueryOptions = executeQueryOptions;
this.asyncToken = this.parseResponse(this.text).token;
}

private parseResponse(response: string) {
const parsed = JSONbig.parse(response);
const { token, message, monitorSql } = parsed;
return {
token,
message,
monitorSql
};
}
get asyncQueryToken(): string {
return this.asyncToken;
}

async streamResult(options?: StreamOptions) {
throw new Error("Method not allowed for async statements");
}

async fetchResult(): Promise<{
data: any;
meta: Meta[];
statistics: Statistics | null;
}> {
throw new Error("Method not allowed for async statements");
}
}
11 changes: 3 additions & 8 deletions src/statement/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,15 @@ import {
import { Meta } from "../meta";
import { isDataQuery } from "../common/util";
import { RowStream } from "./stream/rowStream";
import { JSONStream } from "./stream/jsonStream";
import {
normalizeResponse,
getNormalizedStatistics
} from "./normalizeResponse";
import { normalizeResponse } from "./normalizeResponse";
import { CompositeError } from "../common/errors";

export class Statement {
private rowStream: RowStream;
private context: Context;
private query: string;
private executeQueryOptions: ExecuteQueryOptions;

private text;
private rowStream: RowStream;
private readonly text: string;

constructor(
context: Context,
Expand Down
1 change: 1 addition & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export type QuerySettings = Record<
> & {
output_format?: OutputFormat;
internal?: Record<string, string | number>[];
async?: boolean;
};

export type RowParser = (row: string, isLastRow: boolean) => any;
Expand Down
Loading
Loading