diff --git a/.gitignore b/.gitignore index b9bd3ab3..0184f9f7 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ npm-debug.log* .#* coverage/ allure-results/ +.idea diff --git a/README.md b/README.md index 2906e579..402f3c99 100644 --- a/README.md +++ b/README.md @@ -617,37 +617,55 @@ await database.delete(); ### Streaming results -The recommended way to consume query results is by using streams. +Streaming can only be used with the `executeStream` method. +The streamResult method for the normal `execute` method returns an in-memory stream of result, rather than dynamically fetching them from the response. This is further explained in the [in-memory stream](#in-memory-stream) section. -For convenience, `statement.streamResult` also returns `meta: Promise` and `statistics: Promise`, which are wrappers over `data.on('metadata')` and `data.on('statistics')`. +The recommended way to consume query results is by using streams with standard events: +* `data.on('meta')` +* `data.on('data')` +* `data.on('end')` +* `data.on('error')` ```typescript const firebolt = Firebolt(); const connection = await firebolt.connect(connectionParams); -const statement = await connection.execute("SELECT 1"); +const statement = await connection.executeStream("SELECT 1"); -const { - data, - meta: metaPromise, - statistics: statisticsPromise -} = await statement.streamResult(); +const { data } = await statement.streamResult(); const rows: unknown[] = []; -const meta = await metaPromise; - -for await (const row of data) { - rows.push(row); -} +const meta = await stream.once(data, "meta"); -const statistics = await statisticsPromise +data.on("data", data => {; + rows.push(data); +}); console.log(meta); -console.log(statistics); console.log(rows) ``` +In case an errors occurs before streaming, or during the first packet, the error will be thrown by the executeStream method. If the error occurs during streaming, it will be emitted by the stream. +```typescript +try { + await connection.executeStream("select *1;"); +} catch (error) { + //error is thrown directly since this is a syntax error +} + +const statement = await connection.executeStream( + "select 1/(i-100000) as a from generate_series(1,100000) as i" +); +const { data } = await statement.streamResult(); + +data.on("error", error => { + //error is emitted by the stream after first chunk of results + console.log(error); +}); + +``` + ### Custom stream transformers @@ -679,7 +697,7 @@ const serializedStream = new SerializeRowStream() const firebolt = Firebolt(); const connection = await firebolt.connect(connectionParams); -const statement = await connection.execute("select 1 union all select 2"); +const statement = await connection.executeStream("select * from generate_series(1, 1000)"); const { data } = await statement.streamResult(); @@ -687,14 +705,41 @@ const { data } = await statement.streamResult(); data.pipe(serializedStream).pipe(process.stdout); ``` -Or use `rowParser` that returns strings or Buffer: + +### In-memory stream + +When using the streamResult method on the object returned from a simple execute method, the driver will return +an in-memory stream of the result. This is useful for small result sets, but not recommended for large result sets. + +In this case the whole result will be first fetched in memory and then made available via streamResult. +This is done for compatibility reasons and has no performance benefits compared to using fetchResult ```typescript -const { data } = await statement.streamResult({ - rowParser: (row: string) => `${row}\n` -}); +const firebolt = Firebolt(); + +const connection = await firebolt.connect(connectionParams); + +const statement = await connection.execute("SELECT 1"); + +const { + data, + meta: metaPromise, + statistics: statisticsPromise +} = await statement.streamResult(); -data.pipe(process.stdout); +const rows: unknown[] = []; + +const meta = await metaPromise; + +for await (const row of data) { + rows.push(row); +} + +const statistics = await statisticsPromise + +console.log(meta); +console.log(statistics); +console.log(rows) ``` ## Development process diff --git a/package-lock.json b/package-lock.json index b3730077..bff5527d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,7 +13,8 @@ "abort-controller": "^3.0.0", "agentkeepalive": "^4.5.0", "json-bigint": "^1.0.0", - "node-fetch": "^2.6.6" + "node-fetch": "^2.6.6", + "stream": "^0.0.3" }, "devDependencies": { "@types/jest": "^29.5.12", @@ -2718,6 +2719,18 @@ "dot-prop": "^5.1.0" } }, + "node_modules/component-emitter": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/component-emitter/-/component-emitter-2.0.0.tgz", + "integrity": "sha512-4m5s3Me2xxlVKG9PkZpQqHQR7bgpnN7joDMJ4yvVkVXngjoITG76IaZmzmywSeRTeTpc6N6r3H3+KyUurV8OYw==", + "license": "MIT", + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/concat-map": { "version": "0.0.1", "resolved": "https://registry.npmjs.org/concat-map/-/concat-map-0.0.1.tgz", @@ -8021,6 +8034,15 @@ "node": ">= 0.8" } }, + "node_modules/stream": { + "version": "0.0.3", + "resolved": "https://registry.npmjs.org/stream/-/stream-0.0.3.tgz", + "integrity": "sha512-aMsbn7VKrl4A2T7QAQQbzgN7NVc70vgF5INQrBXqn4dCXN1zy3L9HGgLO5s7PExmdrzTJ8uR/27aviW8or8/+A==", + "license": "MIT", + "dependencies": { + "component-emitter": "^2.0.0" + } + }, "node_modules/strict-event-emitter": { "version": "0.2.4", "resolved": "https://registry.npmjs.org/strict-event-emitter/-/strict-event-emitter-0.2.4.tgz", @@ -10813,6 +10835,11 @@ "dot-prop": "^5.1.0" } }, + "component-emitter": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/component-emitter/-/component-emitter-2.0.0.tgz", + "integrity": "sha512-4m5s3Me2xxlVKG9PkZpQqHQR7bgpnN7joDMJ4yvVkVXngjoITG76IaZmzmywSeRTeTpc6N6r3H3+KyUurV8OYw==" + }, "concat-map": { "version": "0.0.1", "resolved": "https://registry.npmjs.org/concat-map/-/concat-map-0.0.1.tgz", @@ -14780,6 +14807,14 @@ "integrity": "sha512-RwNA9Z/7PrK06rYLIzFMlaF+l73iwpzsqRIFgbMLbTcLD6cOao82TaWefPXQvB2fOC4AjuYSEndS7N/mTCbkdQ==", "dev": true }, + "stream": { + "version": "0.0.3", + "resolved": "https://registry.npmjs.org/stream/-/stream-0.0.3.tgz", + "integrity": "sha512-aMsbn7VKrl4A2T7QAQQbzgN7NVc70vgF5INQrBXqn4dCXN1zy3L9HGgLO5s7PExmdrzTJ8uR/27aviW8or8/+A==", + "requires": { + "component-emitter": "^2.0.0" + } + }, "strict-event-emitter": { "version": "0.2.4", "resolved": "https://registry.npmjs.org/strict-event-emitter/-/strict-event-emitter-0.2.4.tgz", diff --git a/src/connection/base.ts b/src/connection/base.ts index bb2132f5..69f5dc4d 100644 --- a/src/connection/base.ts +++ b/src/connection/base.ts @@ -10,6 +10,8 @@ import { CompositeError } from "../common/errors"; import JSONbig from "json-bigint"; import { QueryFormatter } from "../formatter/base"; import { AsyncStatement } from "../statement/async"; +import { StreamStatement } from "../statement/stream"; +import { Response, Headers } from "node-fetch"; const defaultQuerySettings = { output_format: OutputFormat.COMPACT @@ -166,6 +168,11 @@ export abstract class Connection { executeQueryOptions?: ExecuteQueryOptions ): Promise; + abstract executeStream( + query: string, + executeQueryOptions?: ExecuteQueryOptions + ): Promise; + abstract isAsyncQueryRunning(token: string): Promise; abstract isAsyncQuerySuccessful(token: string): Promise; diff --git a/src/connection/connection_v1.ts b/src/connection/connection_v1.ts index 9083599f..7a5a31e7 100644 --- a/src/connection/connection_v1.ts +++ b/src/connection/connection_v1.ts @@ -3,6 +3,7 @@ import { Connection as BaseConnection } from "./base"; import { ResourceManager } from "../service"; import { ExecuteQueryOptions } from "../types"; import { AsyncStatement } from "../statement/async"; +import { StreamStatement } from "../statement/stream"; export interface AccountInfo { id: string; @@ -93,4 +94,13 @@ export class ConnectionV1 extends BaseConnection { "Asynchronous execution is not supported in this Firebolt version." ); } + + async executeStream( + query: string, + executeQueryOptions?: ExecuteQueryOptions + ): Promise { + throw new Error( + "Stream execution is not supported in this Firebolt version." + ); + } } diff --git a/src/connection/connection_v2.ts b/src/connection/connection_v2.ts index b996b9c4..f99edba8 100644 --- a/src/connection/connection_v2.ts +++ b/src/connection/connection_v2.ts @@ -3,8 +3,9 @@ import { ACCOUNT_SYSTEM_ENGINE, QUERY_URL } from "../common/api"; import { Connection as BaseConnection } from "./base"; import { Cache, inMemoryCache, noneCache } from "../common/tokenCache"; -import { ExecuteQueryOptions } from "../types"; +import { ExecuteQueryOptions, OutputFormat } from "../types"; import { AsyncStatement } from "../statement/async"; +import { StreamStatement } from "../statement/stream"; export class ConnectionV2 extends BaseConnection { private get account(): string { @@ -103,6 +104,24 @@ export class ConnectionV2 extends BaseConnection { }); } + async executeStream( + query: string, + executeQueryOptions: ExecuteQueryOptions = {} + ): Promise { + const { response } = await this.prepareAndExecuteQuery(query, { + ...executeQueryOptions, + settings: { + ...executeQueryOptions?.settings, + output_format: OutputFormat.JSON_LINES + } + }); + + return new StreamStatement({ + response, + executeQueryOptions + }); + } + private async getAsyncQueryInfo(token: string) { const query = `CALL fb_GetAsyncStatus('${token}')`; diff --git a/src/index.ts b/src/index.ts index 7eee8b4a..2b3ac721 100644 --- a/src/index.ts +++ b/src/index.ts @@ -38,7 +38,5 @@ export { QueryFormatter } from "./formatter/base"; -export { JSONParser } from "./statement/stream/parser"; - export type { Connection } from "./connection"; export type { Meta } from "./meta"; diff --git a/src/statement/dataTypes.ts b/src/statement/dataTypes.ts index d4252703..71abe89d 100644 --- a/src/statement/dataTypes.ts +++ b/src/statement/dataTypes.ts @@ -11,13 +11,16 @@ const typeMapping = { decimal: "decimal", decimal_ext: "decimal", long: "long", + bigint: "long", float: "float", + real: "float", float32: "float", float64: "double", double: "double", double32: "double", double64: "double", - integer: "integer", + "double precision": "double", + integer: "int", int: "int", uint: "int", int8: "int", @@ -37,12 +40,15 @@ const getMappedType = (innerType: string) => { return type; } if ( - innerType.match(/datetime64(.+)/i) || - innerType.match(/timestamp_ext(.+)/i) + RegExp(/datetime64(.+)/i).exec(innerType) || + RegExp(/timestamp_ext(.+)/i).exec(innerType) ) { return typeMapping.timestamp; } - if (innerType.match(/decimal(.+)/i)) { + if ( + RegExp(/decimal(.+)/i).exec(innerType) || + RegExp(/numeric(.+)/i).exec(innerType) + ) { return typeMapping.decimal; } }; @@ -83,6 +89,7 @@ export const STRING_TYPES = withNullableTypes(["string", "text"]); export const BYTEA_TYPES = withNullableTypes(["bytea"]); +//todo fix nullable types FIR-45354 export const getFireboltType = (type: string): string => { const key = type.toLowerCase(); const match = key.match(COMPLEX_TYPE); diff --git a/src/statement/normalizeResponse.ts b/src/statement/normalizeResponse.ts index 5532b96b..2798819c 100644 --- a/src/statement/normalizeResponse.ts +++ b/src/statement/normalizeResponse.ts @@ -1,7 +1,7 @@ import { - QueryResponse, - OutputFormat, ExecuteQueryOptions, + OutputFormat, + QueryResponse, Row } from "../types"; import { Meta } from "../meta"; @@ -35,11 +35,11 @@ export const normalizeColumn = (column: { name: string; type: string }) => { return new Meta(column); }; -const getNormalizedMeta = (response: ParsedResponse): Meta[] => { - if (!response.meta) { +export const getNormalizedMeta = (meta: any[]): Meta[] => { + if (!meta) { return []; } - return response.meta.map(normalizeColumn); + return meta.map(normalizeColumn); }; export const getNormalizedStatistics = (response: ParsedResponse) => { @@ -71,7 +71,7 @@ export const normalizeResponse = ( const hydrate = executeQueryOptions?.response?.hydrateRow || hydrateRow; - const meta = getNormalizedMeta(response); + const meta = getNormalizedMeta(response.meta); const statistics = getNormalizedStatistics(response); @@ -96,3 +96,21 @@ export const normalizeResponse = ( statistics }; }; + +export const normalizeResponseRowStreaming = ( + data: any[], + executeQueryOptions: ExecuteQueryOptions, + meta: Meta[] +): Row[] => { + const { response: { normalizeData = false } = {} } = executeQueryOptions; + + const hydrate = executeQueryOptions?.response?.hydrateRow || hydrateRow; + + return data.map((row: Row) => { + const hydratedRow = hydrate(row, meta, executeQueryOptions); + if (normalizeData) { + return normalizeRow(hydratedRow, meta, executeQueryOptions); + } + return hydratedRow; + }); +}; diff --git a/src/statement/stream.ts b/src/statement/stream.ts new file mode 100644 index 00000000..25d10254 --- /dev/null +++ b/src/statement/stream.ts @@ -0,0 +1,25 @@ +import { ExecuteQueryOptions } from "../types"; +import { ServerSideStream } from "./stream/serverSideStream"; +import { Response } from "node-fetch"; + +export class StreamStatement { + private readonly executeQueryOptions: ExecuteQueryOptions; + private readonly response: Response; + + constructor({ + response, + executeQueryOptions + }: { + response: Response; + executeQueryOptions: ExecuteQueryOptions; + }) { + this.response = response; + this.executeQueryOptions = executeQueryOptions; + } + + async streamResult() { + return { + data: new ServerSideStream(this.response, this.executeQueryOptions) + }; + } +} diff --git a/src/statement/stream/jsonStream.ts b/src/statement/stream/jsonStream.ts deleted file mode 100644 index 657bbd73..00000000 --- a/src/statement/stream/jsonStream.ts +++ /dev/null @@ -1,71 +0,0 @@ -import JSONbig from "json-bigint"; -import { StreamOptions, ExecuteQueryOptions, RowParser } from "../../types"; -import { Meta } from "../../meta"; -import { normalizeColumn, normalizeRow } from "../normalizeResponse"; -import { hydrateRow } from "../hydrateResponse"; -import { RowStream } from "./rowStream"; -import { JSONParser } from "./parser"; - -export class JSONStream { - jsonParser: JSONParser; - options?: StreamOptions; - executeQueryOptions: ExecuteQueryOptions; - emitter: RowStream; - rowParser: RowParser; - - constructor({ - emitter, - options, - executeQueryOptions - }: { - emitter: RowStream; - options?: StreamOptions; - executeQueryOptions: ExecuteQueryOptions; - }) { - this.emitter = emitter; - this.options = options; - this.executeQueryOptions = executeQueryOptions; - this.rowParser = this.options?.rowParser || this.defaultRowParser; - - this.jsonParser = new JSONParser({ - onMetadataParsed: columns => { - this.emitter.emit("metadata", columns); - }, - hydrateRow: this.rowParser, - hydrateColumn: (columnStr: string) => { - const column = JSONbig.parse(columnStr); - return normalizeColumn(column); - } - }); - } - - defaultRowParser = (row: string, isLastRow: boolean) => { - const normalizeData = this.executeQueryOptions.response?.normalizeData; - const parsed = JSONbig.parse(row); - const hydrate = this.executeQueryOptions.response?.hydrateRow || hydrateRow; - const result = this.getResult(0); - const columns = result.columns; - const hydratedRow = hydrate( - parsed, - columns as Meta[], - this.executeQueryOptions - ); - if (normalizeData) { - const normalizedRow = normalizeRow( - hydratedRow, - columns as Meta[], - this.executeQueryOptions - ); - return normalizedRow; - } - return hydratedRow; - }; - - processLine(line: string) { - this.jsonParser.processLine(line); - } - - getResult(index: number) { - return this.jsonParser.results[index]; - } -} diff --git a/src/statement/stream/parser.test.ts b/src/statement/stream/parser.test.ts deleted file mode 100644 index 283609c8..00000000 --- a/src/statement/stream/parser.test.ts +++ /dev/null @@ -1,95 +0,0 @@ -import { JSONParser } from "./parser"; - -const body1 = ` -{ - "query": - { - "query_id": "50cd4109-02de-4e19-b995-5e71a5f16fb7" - }, - "meta": - [ - { - "name": "engine_name", - "type": "text" - } - ], - - "data": - [ - ["peacekeeper_ns_2023_01_20_08_06_45_153_create_engine_test"], - ["Aymeric_test_2_Analytics"], - ["integration_testing_windowslatest_37_1675858870"], - ["test_4"], - ["peacekeeper_ns_2023_01_21_11_03_09_210_CREATE_ENGINE_TEST_2_3"] - ], - - "rows": 5, - - "rows_before_limit_at_least": 233, - - "statistics": - { - "elapsed": 0.42408089, - "rows_read": 233, - "bytes_read": 30680, - "time_before_execution": 0.000588018, - "time_to_execute": 0.423289816, - "scanned_bytes_cache": 0, - "scanned_bytes_storage": 0 - } -} -`; -const body2 = ` -{ -"query": -{ -"query_id": "50cd4109-02de-4e19-b995-5e71a5f16fb9" -}, -"meta": -[ -{ -"name": "engine_name", -"type": "text" -} -], - -"data": -[ -["peacekeeper_ns_2023_01_20_08_06_45_153_create_engine_test"], -["Aymeric_test_2_Analytics"], -["integration_testing_windowslatest_37_1675858870"] -], - -"rows": 3, - -"rows_before_limit_at_least": 233, - -"statistics": -{ -"elapsed": 0.42408089, -"rows_read": 233, -"bytes_read": 30680, -"time_before_execution": 0.000588018, -"time_to_execute": 0.423289816, -"scanned_bytes_cache": 0, -"scanned_bytes_storage": 0 -} -} -`; -describe("parser", () => { - it("handles single reponse", () => { - const parser = new JSONParser({}); - parser.processBody(body1); - expect(parser.results[0].rows).toHaveLength(5); - expect(parser.results[0].columns).toHaveLength(1); - }); - it("handles multi response", () => { - const parser = new JSONParser({}); - parser.processBody(` -${body1} -${body2}`); - expect(parser.results[0].rows).toHaveLength(5); - expect(parser.results[1].rows).toHaveLength(3); - expect(parser.results[1].columns).toHaveLength(1); - }); -}); diff --git a/src/statement/stream/parser.ts b/src/statement/stream/parser.ts deleted file mode 100644 index 31a2a6e2..00000000 --- a/src/statement/stream/parser.ts +++ /dev/null @@ -1,213 +0,0 @@ -import JSONbig from "json-bigint"; - -export class JSONParser { - state: - | "meta" - | "meta-array" - | "rootKeys" - | "data" - | "data-array" - | "query" - | "query-object" - | "statistics-object" - | null; - - onMetadataParsed; - hydrateColumn; - hydrateRow; - - objBuffer?: string; - currentIndex: number; - - results: { - rows: unknown[]; - columns: unknown[]; - statistics: any; - query?: any; - }[]; - - constructor({ - onMetadataParsed = (columns: any) => {}, - hydrateColumn = (column: string) => JSONbig.parse(column), - hydrateRow = (row: string, isLastRow: boolean): any => JSONbig.parse(row) - }) { - this.onMetadataParsed = onMetadataParsed; - this.hydrateColumn = hydrateColumn; - this.hydrateRow = hydrateRow; - - this.results = []; - - this.state = null; - this.currentIndex = 0; - } - - fillEmptyResult() { - this.results[this.currentIndex] = { - columns: [], - rows: [], - statistics: {} - }; - } - - pushColumn(column: unknown) { - this.results[this.currentIndex].columns.push(column); - } - - pushRow(row: unknown) { - this.results[this.currentIndex].rows.push(row); - } - - pushQuery(query: any) { - this.results[this.currentIndex].query = query; - } - - pushStatistics(statistics: any) { - this.results[this.currentIndex].statistics = statistics; - } - - handleRoot(line: string) { - if (line === "{") { - this.state = "rootKeys"; - if (this.results.length > 0) { - this.currentIndex += 1; - } - this.fillEmptyResult(); - } - } - - handleRootKeys(line: string) { - if (line === '"query":') { - this.state = "query"; - } else if (line === '"query": {') { - this.objBuffer = "{"; - this.state = "query-object"; - } else if (line === '"meta":') { - this.state = "meta"; - } else if (line === '"data":') { - this.state = "data"; - } else if (line === '"meta": [') { - this.state = "meta-array"; - } else if (line === '"data": [') { - this.state = "data-array"; - } else if (line === '"statistics":') { - this.objBuffer = ""; - this.state = "statistics-object"; - } else if (line === "}" || line === "},") { - this.state = null; - } - } - - handleMeta(line: string) { - if (line === "[") { - this.state = "meta-array"; - } - } - - handleMetaArray(line: string) { - if (line.match(/^},?$/)) { - const columnStr = this.objBuffer + "}"; - const column = this.hydrateColumn(columnStr); - this.pushColumn(column); - this.objBuffer = undefined; - } else if (line === "{") { - this.objBuffer = line; - } else if (line.match(/^],?$/)) { - this.onMetadataParsed(this.results[this.currentIndex].columns); - this.state = "rootKeys"; - } else { - this.objBuffer += line; - } - } - - handleDataArray(line: string) { - if (line.match(/^[\]}],?$/) && this.objBuffer) { - const rowStr = this.objBuffer + line[0]; - const row = this.hydrateRow(rowStr, false); - this.pushRow(row); - this.objBuffer = undefined; - } else if (line === "{" || line === "[") { - this.objBuffer = line; - } else if (line.match(/^],?$/)) { - this.state = "rootKeys"; - } else if (this.objBuffer === undefined) { - const isLastRow = line[line.length - 1] !== ","; - const rowStr = isLastRow ? line : line.substring(0, line.length - 1); - const row = this.hydrateRow(rowStr, isLastRow); - this.pushRow(row); - } else { - this.objBuffer += line; - } - } - - handleData(line: string) { - if (line === "[") { - this.state = "data-array"; - } - } - - handleQuery(line: string) { - if (line === "{") { - this.objBuffer = "{"; - this.state = "query-object"; - } - } - - handleQueryObject(line: string) { - if (line.match(/^},?$/)) { - const queryStr = this.objBuffer + "}"; - const query = JSONbig.parse(queryStr); - this.objBuffer = undefined; - this.state = "rootKeys"; - this.pushQuery(query); - } else { - this.objBuffer += line; - } - } - - handleStatisticsObject(line: string) { - if (line.match(/^},?$/)) { - const queryStr = this.objBuffer + "}"; - const statistics = JSONbig.parse(queryStr); - this.objBuffer = undefined; - this.state = "rootKeys"; - this.pushStatistics(statistics); - } else { - this.objBuffer += line; - } - } - - processLine(line: string) { - line = line.trim(); - - if (!line.length) { - return; - } - - if (this.state === null) { - this.handleRoot(line); - } else if (this.state === "rootKeys") { - this.handleRootKeys(line); - } else if (this.state === "meta") { - this.handleMeta(line); - } else if (this.state === "data") { - this.handleData(line); - } else if (this.state === "meta-array") { - this.handleMetaArray(line); - } else if (this.state === "data-array") { - this.handleDataArray(line); - } else if (this.state === "query") { - this.handleQuery(line); - } else if (this.state === "query-object") { - this.handleQueryObject(line); - } else if (this.state === "statistics-object") { - this.handleStatisticsObject(line); - } - } - - processBody(body: string) { - const lines = body.split("\n"); - for (const line of lines) { - this.processLine(line); - } - } -} diff --git a/src/statement/stream/serverSideStream.ts b/src/statement/stream/serverSideStream.ts new file mode 100644 index 00000000..18d7aa82 --- /dev/null +++ b/src/statement/stream/serverSideStream.ts @@ -0,0 +1,76 @@ +import { Readable } from "stream"; +import JSONbig from "json-bigint"; +import readline from "readline"; +import { + getNormalizedMeta, + normalizeResponseRowStreaming +} from "../normalizeResponse"; +import { Response } from "node-fetch"; +import { ExecuteQueryOptions } from "../../types"; +import { Meta } from "../../meta"; + +export class ServerSideStream extends Readable { + private meta: Meta[] = []; + constructor( + private readonly response: Response, + private readonly executeQueryOptions: ExecuteQueryOptions + ) { + super({ objectMode: true }); + const readLine = readline.createInterface({ + input: response.body, + crlfDelay: Infinity + }); + + const lineParser = (line: string) => { + try { + if (line.trim()) { + const parsed = JSONbig.parse(line); + if (parsed) { + if (parsed.message_type === "DATA") { + this.processData(parsed); + } else if (parsed.message_type === "START") { + this.meta = getNormalizedMeta(parsed.result_columns); + this.emit("meta", this.meta); + } else if (parsed.message_type === "FINISH_SUCCESSFULLY") { + this.push(null); + } else if (parsed.message_type === "FINISH_WITH_ERRORS") { + this.destroy( + new Error( + `Result encountered an error: ${parsed.errors + .map((error: { description: string }) => error.description) + .join("\n")}` + ) + ); + } + } else { + this.destroy(new Error(`Result row could not be parsed: ${line}`)); + } + } + } catch (err) { + this.destroy(err); + } + }; + readLine.on("line", lineParser); + + readLine.on("close", () => { + this.push(null); + }); + } + + private processData(parsed: { data: any[] }) { + if (parsed.data) { + const normalizedData = normalizeResponseRowStreaming( + parsed.data, + this.executeQueryOptions, + this.meta + ); + for (const data of normalizedData) { + this.emit("data", data); + } + } + } + + _read() { + /* _read method requires implementation, even if data comes from other sources */ + } +} diff --git a/src/types.ts b/src/types.ts index e8b152ff..1fd3208c 100644 --- a/src/types.ts +++ b/src/types.ts @@ -26,7 +26,8 @@ export enum OutputFormat { // JSON_COMPACT_LIMITED = "FB_JSONCompactLimited", COMPACT = "JSON_Compact", // supported in v3 of packdb JSON_COMPACT = "JSONCompact", // to be as above, after ensure all clients has v3 - JSON = "JSON" + JSON = "JSON", + JSON_LINES = "JSONLines_Compact" } export type QuerySettings = Record< diff --git a/test/integration/v1/stream.test.ts b/test/integration/v1/stream.test.ts index 22000c8a..7f667fd7 100644 --- a/test/integration/v1/stream.test.ts +++ b/test/integration/v1/stream.test.ts @@ -1,5 +1,4 @@ -import stream, { TransformCallback } from "stream"; -import { Firebolt } from "../../../src/index"; +import { Firebolt } from "../../../src"; const connectionParams = { auth: { @@ -13,43 +12,19 @@ const connectionParams = { jest.setTimeout(20000); describe("streams", () => { - it("stream transformters", async () => { - class SerializeRowStream extends stream.Transform { - public constructor() { - super({ - objectMode: true, - transform( - row: any, - encoding: BufferEncoding, - callback: TransformCallback - ) { - const transformed = JSON.stringify(row); - this.push(transformed); - this.push("\n"); - callback(); - } - }); - } - } - + it("stream transformers", async () => { const firebolt = Firebolt({ apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string }); - const serializedStream = new SerializeRowStream(); - const connection = await firebolt.connect(connectionParams); - const statement = await connection.execute("select 1 union all select 2"); - - const { data } = await statement.streamResult(); - - data.pipe(serializedStream).pipe(process.stdout); - - await new Promise(resolve => { - data.on("end", () => { - resolve(null); - }); - }); + expect(() => + connection.executeStream( + `select 1 from generate_series(1, 250000000)` //~1 GB response + ) + ).rejects.toThrow( + Error("Stream execution is not supported in this Firebolt version.") + ); }); }); diff --git a/test/integration/v2/fetchTypes.test.ts b/test/integration/v2/fetchTypes.test.ts index 71638117..afa8f156 100644 --- a/test/integration/v2/fetchTypes.test.ts +++ b/test/integration/v2/fetchTypes.test.ts @@ -1,5 +1,6 @@ -import { Firebolt } from "../../../src/index"; +import { Firebolt } from "../../../src"; import BigNumber from "bignumber.js"; +import stream from "node:stream"; const connectionParams = { auth: { @@ -158,4 +159,153 @@ describe("test type casting on fetch", () => { await connection.execute("DROP TABLE IF EXISTS test_struct_helper"); } }); + //todo fix nullable types FIR-45354 + it("select all types", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + + const connection = await firebolt.connect({ + ...connectionParams, + engineName: process.env.FIREBOLT_ENGINE_NAME as string + }); + const statement = await connection.execute( + "select 1 as col_int,\n" + + " null::int as col_int_null,\n" + + " 30000000000 as col_long,\n" + + " null::bigint as col_long_null,\n" + + " 1.23::float4 as col_float,\n" + + " null::float4 as col_float_null,\n" + + " 1.23456789012 as col_double,\n" + + " null::double as col_double_null,\n" + + " 'text' as col_text,\n" + + " null::text as col_text_null,\n" + + " '2021-03-28'::date as col_date,\n" + + " null::date as col_date_null,\n" + + " '2019-07-31 01:01:01'::timestamp as col_timestamp,\n" + + " null::timestamp as col_timestamp_null,\n" + + " '1111-01-05 17:04:42.123456'::timestamptz as col_timestamptz,\n" + + " null::timestamptz as col_timestamptz_null,\n" + + " true as col_boolean,\n" + + " null::bool as col_boolean_null,\n" + + " [1,2,3,4] as col_array,\n" + + // " null::array(int) as col_array_null,\n" + + " '1231232.123459999990457054844258706536'::decimal(38, 30) as col_decimal,\n" + + // " null::decimal(38, 30) as col_decimal_null,\n" + + " 'abc123'::bytea as col_bytea,\n" + + " null::bytea as col_bytea_null,\n" + + " 'point(1 2)'::geography as col_geography,\n" + + " null::geography as col_geography_null," + ); + const { data, meta } = await statement.fetchResult(); + const metaObjects = [ + { name: "col_int", type: "int" }, + { name: "col_int_null", type: "int null" }, + { name: "col_long", type: "long" }, + { name: "col_long_null", type: "long null" }, + { name: "col_float", type: "float" }, + { name: "col_float_null", type: "float null" }, + { name: "col_double", type: "double" }, + { name: "col_double_null", type: "double null" }, + { name: "col_text", type: "text" }, + { name: "col_text_null", type: "text null" }, + { name: "col_date", type: "date" }, + { name: "col_date_null", type: "date null" }, + { name: "col_timestamp", type: "timestamp" }, + { name: "col_timestamp_null", type: "timestamp null" }, + { name: "col_timestamptz", type: "timestamptz" }, + { name: "col_timestamptz_null", type: "timestamptz null" }, + { name: "col_boolean", type: "boolean" }, + { name: "col_boolean_null", type: "boolean null" }, + { name: "col_array", type: "array(int)" }, + // { name: "col_array_null", type: "array(int) null" }, + // { name: "col_decimal", type: "decimal(38, 30)" }, + { name: "col_decimal", type: "decimal" }, + // { name: "col_decimal_null", type: "decimal(38, 30) null" }, + { name: "col_bytea", type: "bytea" }, + { name: "col_bytea_null", type: "bytea null" }, + { name: "col_geography", type: "geography" }, + { name: "col_geography_null", type: "geography null" } + ]; + for (let i = 0; i < meta.length; i++) { + expect(meta[i]).toEqual(metaObjects[i]); + } + }); + //todo fix nullable types FIR-45354 + it("select all types in streaming", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + + const connection = await firebolt.connect({ + ...connectionParams, + engineName: process.env.FIREBOLT_ENGINE_NAME as string + }); + const statement = await connection.executeStream( + "select 1 as col_int,\n" + + " null::int as col_int_null,\n" + + " 30000000000 as col_long,\n" + + " null::bigint as col_long_null,\n" + + " 1.23::float4 as col_float,\n" + + " null::float4 as col_float_null,\n" + + " 1.23456789012 as col_double,\n" + + " null::double as col_double_null,\n" + + " 'text' as col_text,\n" + + " null::text as col_text_null,\n" + + " '2021-03-28'::date as col_date,\n" + + " null::date as col_date_null,\n" + + " '2019-07-31 01:01:01'::timestamp as col_timestamp,\n" + + " null::timestamp as col_timestamp_null,\n" + + " '1111-01-05 17:04:42.123456'::timestamptz as col_timestamptz,\n" + + " null::timestamptz as col_timestamptz_null,\n" + + " true as col_boolean,\n" + + " null::bool as col_boolean_null,\n" + + " [1,2,3,4] as col_array,\n" + + // " null::array(int) as col_array_null,\n" + + " '1231232.123459999990457054844258706536'::decimal(38, 30) as col_decimal,\n" + + // " null::decimal(38, 30) as col_decimal_null,\n" + + " 'abc123'::bytea as col_bytea,\n" + + " null::bytea as col_bytea_null,\n" + + " 'point(1 2)'::geography as col_geography,\n" + + " null::geography as col_geography_null," + ); + const { data } = await statement.streamResult(); + const [meta] = await stream.once(data, "meta"); + const metaObjects = [ + { name: "col_int", type: "int" }, + // { name: "col_int_null", type: "int null" }, + { name: "col_int_null", type: "integer null" }, + { name: "col_long", type: "long" }, + // { name: "col_long_null", type: "long null" }, + { name: "col_long_null", type: "bigint null" }, + { name: "col_float", type: "float" }, + // { name: "col_float_null", type: "float null" }, + { name: "col_float_null", type: "real null" }, + { name: "col_double", type: "double" }, + // { name: "col_double_null", type: "double null" }, + { name: "col_double_null", type: "double precision null" }, + { name: "col_text", type: "text" }, + { name: "col_text_null", type: "text null" }, + { name: "col_date", type: "date" }, + { name: "col_date_null", type: "date null" }, + { name: "col_timestamp", type: "timestamp" }, + { name: "col_timestamp_null", type: "timestamp null" }, + { name: "col_timestamptz", type: "timestamptz" }, + { name: "col_timestamptz_null", type: "timestamptz null" }, + { name: "col_boolean", type: "boolean" }, + { name: "col_boolean_null", type: "boolean null" }, + { name: "col_array", type: "array(int)" }, + // { name: "col_array_null", type: "array(int) null" }, + // { name: "col_decimal", type: "decimal(38, 30)" }, + { name: "col_decimal", type: "decimal" }, + // { name: "col_decimal_null", type: "decimal(38, 30) null" }, + { name: "col_bytea", type: "bytea" }, + { name: "col_bytea_null", type: "bytea null" }, + { name: "col_geography", type: "geography" }, + { name: "col_geography_null", type: "geography null" } + ]; + for (let i = 0; i < meta.length; i++) { + expect(meta[i]).toEqual(metaObjects[i]); + } + }); }); diff --git a/test/integration/v2/stream.test.ts b/test/integration/v2/stream.test.ts index 0142ce7c..4ab00ab7 100644 --- a/test/integration/v2/stream.test.ts +++ b/test/integration/v2/stream.test.ts @@ -1,5 +1,5 @@ -import stream, { TransformCallback } from "stream"; -import { Firebolt } from "../../../src/index"; +import { Firebolt } from "../../../src"; +import stream, { TransformCallback } from "node:stream"; const connectionParams = { auth: { @@ -11,10 +11,71 @@ const connectionParams = { engineName: process.env.FIREBOLT_ENGINE_NAME as string }; -jest.setTimeout(40000); +jest.setTimeout(250000); describe("streams", () => { - it("stream transformters", async () => { + it("check sum from stream result", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + + const connection = await firebolt.connect(connectionParams); + + const statement = await connection.executeStream( + `select 1 from generate_series(1, 2500000)` //~1 GB response + ); + + const { data } = await statement.streamResult(); + let sum = 0; + const meta = await stream.once(data, "meta"); + console.log("Meta:", meta); + + data.on("data", row => { + sum += row[0]; + }); + + await new Promise(resolve => { + data.on("end", () => { + expect(sum).toEqual(2500000); + resolve(null); + }); + }); + }); + it("check normalized data", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + + const connection = await firebolt.connect(connectionParams); + + const statement = await connection.executeStream( + `select 1 from generate_series(1, 250000)`, //~1 GB response + { + response: { + normalizeData: true + } + } + ); + + const { data } = await statement.streamResult(); + let sum = 0; + + data + .on("meta", meta => { + console.log("Meta:", meta); + }) + .on("data", row => { + sum += row["?column?"]; + }); + + await new Promise(resolve => { + data.on("end", () => { + expect(sum).toEqual(250000); + resolve(null); + }); + }); + }); + it("stream transformers", async () => { class SerializeRowStream extends stream.Transform { public constructor() { super({ @@ -32,19 +93,17 @@ describe("streams", () => { }); } } - const firebolt = Firebolt({ apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string }); - - const serializedStream = new SerializeRowStream(); - const connection = await firebolt.connect(connectionParams); - const statement = await connection.execute("select 1 union all select 2"); + const serializedStream = new SerializeRowStream(); + const statement = await connection.executeStream( + "select * from generate_series(1, 10)" + ); const { data } = await statement.streamResult(); - data.pipe(serializedStream).pipe(process.stdout); await new Promise(resolve => { @@ -53,4 +112,45 @@ describe("streams", () => { }); }); }); + it("stream syntax error", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + + try { + await connection.executeStream("select *1;"); + } catch (error) { + expect(error.message).toContain( + "Line 1, Column 9: syntax error, unexpected integer, expecting end of file" + ); + expect(error.message).toContain("select *1;"); + } + }); + it("stream division by 0 error", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + + const statement = await connection.executeStream( + "select 1/(i-100000) as a from generate_series(1,100000) as i" + ); + + const { data } = await statement.streamResult(); + data.on("meta", meta => { + expect(meta).toEqual([ + { + name: "a", + type: "int" + } + ]); + }); + const [error] = await stream.once(data, "error"); + expect(error.message).toEqual( + "Result encountered an error: Line 1, Column 9: Division by zero\n" + + "select 1/(i-100000) as a from generate_series(1,...\n" + + " ^" + ); + }); }); diff --git a/test/unit/v2/connection.test.ts b/test/unit/v2/connection.test.ts index 58774cf2..ae45ea3c 100644 --- a/test/unit/v2/connection.test.ts +++ b/test/unit/v2/connection.test.ts @@ -4,6 +4,7 @@ import { Firebolt } from "../../../src"; import { ConnectionOptions } from "../../../src/types"; import { QUERY_URL } from "../../../src/common/api"; import { inMemoryCache } from "../../../src/common/tokenCache"; +import stream from "node:stream"; const apiEndpoint = "api.fake.firebolt.io"; @@ -400,12 +401,14 @@ describe("Connection V2", () => { setupMockServer(server); server.use( rest.post( - `https://some_system_engine.com/${QUERY_URL}?output_format=JSON_Compact`, + `https://some_system_engine.com/${QUERY_URL}`, async (req, res, ctx) => { const body = await req.text(); + const urlParams = Object.fromEntries(req.url.searchParams.entries()); if ( body.includes("fb_GetAsyncStatus") && - body.includes("async_query_token") + body.includes("async_query_token") && + urlParams["output_format"] === "JSON_Compact" ) { return res( ctx.json({ @@ -445,4 +448,230 @@ describe("Connection V2", () => { await new Promise(resolve => setTimeout(resolve, 100)); // somehow we need it to wait for the flag switch expect(cancelQueryExecuted).toBe(true); }); + + it("streaming works as expected", async () => { + const firebolt = Firebolt({ + apiEndpoint + }); + const jsonLines = [ + JSON.stringify({ + message_type: "START", + result_columns: [ + { + name: "?column?", + type: "integer" + } + ] + }), + JSON.stringify({ + message_type: "DATA", + data: [[1], [1]] + }), + JSON.stringify({ + message_type: "FINISH_SUCCESSFULLY" + }) + ].join("\n"); + + setupMockServer(server); + server.use( + rest.post( + `https://some_system_engine.com/${QUERY_URL}`, + async (req, res, ctx) => { + const body = await req.text(); + const urlParams = Object.fromEntries(req.url.searchParams.entries()); + if ( + body.includes("select") && + body.includes("generate_series") && + urlParams["output_format"] === "JSONLines_Compact" + ) { + return res(ctx.body(jsonLines)); + } + } + ) + ); + + const connectionParams: ConnectionOptions = { + auth: { + client_id: "dummy", + client_secret: "dummy" + }, + account: "my_account" + }; + + const connection = await firebolt.connect(connectionParams); + const streamStatement = await connection.executeStream( + "select 1 from generate_series(1, 2))" + ); + let rowCount = 0; + const { data } = await streamStatement.streamResult(); + data + .on("meta", meta => { + expect(meta).toEqual([ + { + name: "?column?", + type: "int" + } + ]); + }) + .on("data", row => { + expect(row).toEqual([1]); + rowCount++; + }) + .on("end", () => { + expect(rowCount).toBe(2); + }); + }); + + it("streaming with normalization works as expected", async () => { + const firebolt = Firebolt({ + apiEndpoint + }); + const jsonLines = [ + JSON.stringify({ + message_type: "START", + result_columns: [ + { + name: "?column?", + type: "integer" + } + ] + }), + JSON.stringify({ + message_type: "DATA", + data: [[1], [1]] + }), + JSON.stringify({ + message_type: "FINISH_SUCCESSFULLY" + }) + ].join("\n"); + + setupMockServer(server); + server.use( + rest.post( + `https://some_system_engine.com/${QUERY_URL}`, + async (req, res, ctx) => { + const body = await req.text(); + const urlParams = Object.fromEntries(req.url.searchParams.entries()); + if ( + body.includes("select") && + body.includes("generate_series") && + urlParams["output_format"] === "JSONLines_Compact" + ) { + return res(ctx.body(jsonLines)); + } + } + ) + ); + + const connectionParams: ConnectionOptions = { + auth: { + client_id: "dummy", + client_secret: "dummy" + }, + account: "my_account" + }; + + const connection = await firebolt.connect(connectionParams); + const streamStatement = await connection.executeStream( + "select 1 from generate_series(1, 2))", + { + response: { + normalizeData: true + } + } + ); + let rowCount = 0; + const { data } = await streamStatement.streamResult(); + data + .on("meta", meta => { + expect(meta).toEqual([ + { + name: "?column?", + type: "int" + } + ]); + }) + .on("data", row => { + expect(row).toEqual({ "?column?": 1 }); + rowCount++; + }) + .on("end", () => { + expect(rowCount).toBe(2); + }); + }); + + it("streaming fails with error", async () => { + const firebolt = Firebolt({ + apiEndpoint + }); + const jsonLines = [ + JSON.stringify({ + message_type: "START", + result_columns: [ + { + name: "?column?", + type: "integer" + } + ] + }), + JSON.stringify({ + message_type: "FINISH_WITH_ERRORS", + errors: [ + { + description: + "Line 1, Column 9: syntax error, unexpected identifier, expecting end of file select *1;", + location: { + failing_line: 1, + start_offset: 9 + } + } + ] + }) + ].join("\n"); + + setupMockServer(server); + server.use( + rest.post( + `https://some_system_engine.com/${QUERY_URL}`, + async (req, res, ctx) => { + const body = await req.text(); + const urlParams = Object.fromEntries(req.url.searchParams.entries()); + if ( + body.includes("select *1;") && + urlParams["output_format"] === "JSONLines_Compact" + ) { + return res(ctx.body(jsonLines)); + } + } + ) + ); + + const connectionParams: ConnectionOptions = { + auth: { + client_id: "dummy", + client_secret: "dummy" + }, + account: "my_account" + }; + + const connection = await firebolt.connect(connectionParams); + const streamStatement = await connection.executeStream("select *1;"); + const { data } = await streamStatement.streamResult(); + data + .on("meta", meta => { + expect(meta).toEqual([ + { + name: "?column?", + type: "int" + } + ]); + }) + .on("data", () => { + fail('"Data should not be emitted"'); + }); + const [error] = await stream.once(data, "error"); + expect(error.message).toEqual( + "Result encountered an error: Line 1, Column 9: syntax error, unexpected identifier, expecting end of file select *1;" + ); + }); });