Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ npm-debug.log*
.#*
coverage/
allure-results/
.idea
37 changes: 36 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions src/connection/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import { CompositeError } from "../common/errors";
import JSONbig from "json-bigint";
import { QueryFormatter } from "../formatter/base";
import { AsyncStatement } from "../statement/async";
import { StreamStatement } from "../statement/stream";
import { Response, Headers } from "node-fetch";

const defaultQuerySettings = {
output_format: OutputFormat.COMPACT
Expand Down Expand Up @@ -166,6 +168,11 @@ export abstract class Connection {
executeQueryOptions?: ExecuteQueryOptions
): Promise<AsyncStatement>;

abstract executeStream(
query: string,
executeQueryOptions?: ExecuteQueryOptions
): Promise<StreamStatement>;

abstract isAsyncQueryRunning(token: string): Promise<boolean>;

abstract isAsyncQuerySuccessful(token: string): Promise<boolean | undefined>;
Expand Down
10 changes: 10 additions & 0 deletions src/connection/connection_v1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Connection as BaseConnection } from "./base";
import { ResourceManager } from "../service";
import { ExecuteQueryOptions } from "../types";
import { AsyncStatement } from "../statement/async";
import { StreamStatement } from "../statement/stream";

export interface AccountInfo {
id: string;
Expand Down Expand Up @@ -93,4 +94,13 @@ export class ConnectionV1 extends BaseConnection {
"Asynchronous execution is not supported in this Firebolt version."
);
}

async executeStream(
query: string,
executeQueryOptions?: ExecuteQueryOptions
): Promise<StreamStatement> {
throw new Error(
"Stream execution is not supported in this Firebolt version."
);
}
}
21 changes: 20 additions & 1 deletion src/connection/connection_v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ import { ACCOUNT_SYSTEM_ENGINE, QUERY_URL } from "../common/api";

import { Connection as BaseConnection } from "./base";
import { Cache, inMemoryCache, noneCache } from "../common/tokenCache";
import { ExecuteQueryOptions } from "../types";
import { ExecuteQueryOptions, OutputFormat } from "../types";
import { AsyncStatement } from "../statement/async";
import { StreamStatement } from "../statement/stream";

export class ConnectionV2 extends BaseConnection {
private get account(): string {
Expand Down Expand Up @@ -103,6 +104,24 @@ export class ConnectionV2 extends BaseConnection {
});
}

async executeStream(
query: string,
executeQueryOptions: ExecuteQueryOptions = {}
): Promise<StreamStatement> {
const { response } = await this.prepareAndExecuteQuery(query, {
...executeQueryOptions,
settings: {
...executeQueryOptions?.settings,
output_format: OutputFormat.JSON_LINES
}
});

return new StreamStatement({
response,
executeQueryOptions
});
}

private async getAsyncQueryInfo(token: string) {
const query = `CALL fb_GetAsyncStatus('${token}')`;

Expand Down
30 changes: 24 additions & 6 deletions src/statement/normalizeResponse.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {
QueryResponse,
OutputFormat,
ExecuteQueryOptions,
OutputFormat,
QueryResponse,
Row
} from "../types";
import { Meta } from "../meta";
Expand Down Expand Up @@ -35,11 +35,11 @@ export const normalizeColumn = (column: { name: string; type: string }) => {
return new Meta(column);
};

const getNormalizedMeta = (response: ParsedResponse): Meta[] => {
if (!response.meta) {
export const getNormalizedMeta = (meta: any[]): Meta[] => {
if (!meta) {
return [];
}
return response.meta.map(normalizeColumn);
return meta.map(normalizeColumn);
};

export const getNormalizedStatistics = (response: ParsedResponse) => {
Expand Down Expand Up @@ -71,7 +71,7 @@ export const normalizeResponse = (

const hydrate = executeQueryOptions?.response?.hydrateRow || hydrateRow;

const meta = getNormalizedMeta(response);
const meta = getNormalizedMeta(response.meta);

const statistics = getNormalizedStatistics(response);

Expand All @@ -96,3 +96,21 @@ export const normalizeResponse = (
statistics
};
};

export const normalizeResponseRowStreaming = (
data: any[],
executeQueryOptions: ExecuteQueryOptions,
meta: Meta[]
): Row[] => {
const { response: { normalizeData = false } = {} } = executeQueryOptions;

const hydrate = executeQueryOptions?.response?.hydrateRow || hydrateRow;

return data.map((row: Row) => {
const hydratedRow = hydrate(row, meta, executeQueryOptions);
if (normalizeData) {
return normalizeRow(hydratedRow, meta, executeQueryOptions);
}
return hydratedRow;
});
};
25 changes: 25 additions & 0 deletions src/statement/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { ExecuteQueryOptions } from "../types";
import { ServerSideStream } from "./stream/serverSideStream";
import { Response } from "node-fetch";

export class StreamStatement {
private readonly executeQueryOptions: ExecuteQueryOptions;
private readonly response: Response;

constructor({
response,
executeQueryOptions
}: {
response: Response;
executeQueryOptions: ExecuteQueryOptions;
}) {
this.response = response;
this.executeQueryOptions = executeQueryOptions;
}

async streamResult() {
return {
data: new ServerSideStream(this.response, this.executeQueryOptions)
};
}
}
71 changes: 0 additions & 71 deletions src/statement/stream/jsonStream.ts

This file was deleted.

65 changes: 65 additions & 0 deletions src/statement/stream/serverSideStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { Readable } from "stream";
import JSONbig from "json-bigint";
import readline from "readline";
import {
getNormalizedMeta,
normalizeResponseRowStreaming
} from "../normalizeResponse";
import { Response } from "node-fetch";
import { ExecuteQueryOptions } from "../../types";
import { Meta } from "../../meta";

export class ServerSideStream extends Readable {
private meta: Meta[] = [];
constructor(
private readonly response: Response,
private readonly executeQueryOptions: ExecuteQueryOptions
) {
super({ objectMode: true });
const readLine = readline.createInterface({
input: response.body,
crlfDelay: Infinity
});

const lineParser = (line: string) => {
try {
if (line.trim()) {
const parsed = JSONbig.parse(line);
if (parsed) {
if (parsed.message_type === "DATA") {
if (parsed.data) {
const normalizedData = normalizeResponseRowStreaming(
parsed.data,
executeQueryOptions,
this.meta
);
for (const data of normalizedData) {
this.emit("data", data);
}
}
} else if (parsed.message_type === "START") {
const metaDataJson = JSONbig.parse(line);
this.meta = getNormalizedMeta(metaDataJson.result_columns);
this.emit("meta", this.meta);
} else if (parsed.message_type === "FINISH_SUCCESSFULLY") {
this.push(null);
} else if (parsed.message_type === "FINISH_WITH_ERROR") {
this.destroy(new Error(`Result encountered an error: ${line}`));
}
} else {
this.destroy(new Error(`Result row could not be parsed: ${line}`));
}
}
} catch (err) {
this.destroy(err);
}
};
readLine.on("line", lineParser);

readLine.on("close", () => {
this.push(null);
});
}

_read() {}
}
3 changes: 2 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ export enum OutputFormat {
// JSON_COMPACT_LIMITED = "FB_JSONCompactLimited",
COMPACT = "JSON_Compact", // supported in v3 of packdb
JSON_COMPACT = "JSONCompact", // to be as above, after ensure all clients has v3
JSON = "JSON"
JSON = "JSON",
JSON_LINES = "JSONLines_Compact"
}

export type QuerySettings = Record<
Expand Down
Loading