From 3c8b12f220a613ba0d7fc093519aedb5e560fbf8 Mon Sep 17 00:00:00 2001 From: Bogdan Truta Date: Thu, 10 Apr 2025 13:40:13 +0300 Subject: [PATCH 1/9] feat: FIR-43724 implement streaming in node sdk --- .gitignore | 1 + package-lock.json | 37 +++++++++++++- src/connection/base.ts | 7 +++ src/connection/connection_v1.ts | 10 ++++ src/connection/connection_v2.ts | 21 +++++++- src/statement/normalizeResponse.ts | 30 ++++++++--- src/statement/stream.ts | 28 ++++++++++ src/statement/stream/serverSideStream.ts | 65 ++++++++++++++++++++++++ src/types.ts | 3 +- test/integration/v1/stream.test.ts | 48 +++++++---------- 10 files changed, 212 insertions(+), 38 deletions(-) create mode 100644 src/statement/stream.ts create mode 100644 src/statement/stream/serverSideStream.ts diff --git a/.gitignore b/.gitignore index b9bd3ab3..0184f9f7 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ npm-debug.log* .#* coverage/ allure-results/ +.idea diff --git a/package-lock.json b/package-lock.json index b3730077..bff5527d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,7 +13,8 @@ "abort-controller": "^3.0.0", "agentkeepalive": "^4.5.0", "json-bigint": "^1.0.0", - "node-fetch": "^2.6.6" + "node-fetch": "^2.6.6", + "stream": "^0.0.3" }, "devDependencies": { "@types/jest": "^29.5.12", @@ -2718,6 +2719,18 @@ "dot-prop": "^5.1.0" } }, + "node_modules/component-emitter": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/component-emitter/-/component-emitter-2.0.0.tgz", + "integrity": "sha512-4m5s3Me2xxlVKG9PkZpQqHQR7bgpnN7joDMJ4yvVkVXngjoITG76IaZmzmywSeRTeTpc6N6r3H3+KyUurV8OYw==", + "license": "MIT", + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/concat-map": { "version": "0.0.1", "resolved": "https://registry.npmjs.org/concat-map/-/concat-map-0.0.1.tgz", @@ -8021,6 +8034,15 @@ "node": ">= 0.8" } }, + "node_modules/stream": { + "version": "0.0.3", + "resolved": "https://registry.npmjs.org/stream/-/stream-0.0.3.tgz", + "integrity": "sha512-aMsbn7VKrl4A2T7QAQQbzgN7NVc70vgF5INQrBXqn4dCXN1zy3L9HGgLO5s7PExmdrzTJ8uR/27aviW8or8/+A==", + "license": "MIT", + "dependencies": { + "component-emitter": "^2.0.0" + } + }, "node_modules/strict-event-emitter": { "version": "0.2.4", "resolved": "https://registry.npmjs.org/strict-event-emitter/-/strict-event-emitter-0.2.4.tgz", @@ -10813,6 +10835,11 @@ "dot-prop": "^5.1.0" } }, + "component-emitter": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/component-emitter/-/component-emitter-2.0.0.tgz", + "integrity": "sha512-4m5s3Me2xxlVKG9PkZpQqHQR7bgpnN7joDMJ4yvVkVXngjoITG76IaZmzmywSeRTeTpc6N6r3H3+KyUurV8OYw==" + }, "concat-map": { "version": "0.0.1", "resolved": "https://registry.npmjs.org/concat-map/-/concat-map-0.0.1.tgz", @@ -14780,6 +14807,14 @@ "integrity": "sha512-RwNA9Z/7PrK06rYLIzFMlaF+l73iwpzsqRIFgbMLbTcLD6cOao82TaWefPXQvB2fOC4AjuYSEndS7N/mTCbkdQ==", "dev": true }, + "stream": { + "version": "0.0.3", + "resolved": "https://registry.npmjs.org/stream/-/stream-0.0.3.tgz", + "integrity": "sha512-aMsbn7VKrl4A2T7QAQQbzgN7NVc70vgF5INQrBXqn4dCXN1zy3L9HGgLO5s7PExmdrzTJ8uR/27aviW8or8/+A==", + "requires": { + "component-emitter": "^2.0.0" + } + }, "strict-event-emitter": { "version": "0.2.4", "resolved": "https://registry.npmjs.org/strict-event-emitter/-/strict-event-emitter-0.2.4.tgz", diff --git a/src/connection/base.ts b/src/connection/base.ts index bb2132f5..69f5dc4d 100644 --- a/src/connection/base.ts +++ b/src/connection/base.ts @@ -10,6 +10,8 @@ import { CompositeError } from "../common/errors"; import JSONbig from "json-bigint"; import { QueryFormatter } from "../formatter/base"; import { AsyncStatement } from "../statement/async"; +import { StreamStatement } from "../statement/stream"; +import { Response, Headers } from "node-fetch"; const defaultQuerySettings = { output_format: OutputFormat.COMPACT @@ -166,6 +168,11 @@ export abstract class Connection { executeQueryOptions?: ExecuteQueryOptions ): Promise; + abstract executeStream( + query: string, + executeQueryOptions?: ExecuteQueryOptions + ): Promise; + abstract isAsyncQueryRunning(token: string): Promise; abstract isAsyncQuerySuccessful(token: string): Promise; diff --git a/src/connection/connection_v1.ts b/src/connection/connection_v1.ts index 9083599f..89f32e7c 100644 --- a/src/connection/connection_v1.ts +++ b/src/connection/connection_v1.ts @@ -3,6 +3,7 @@ import { Connection as BaseConnection } from "./base"; import { ResourceManager } from "../service"; import { ExecuteQueryOptions } from "../types"; import { AsyncStatement } from "../statement/async"; +import { StreamStatement } from "../statement/stream"; export interface AccountInfo { id: string; @@ -93,4 +94,13 @@ export class ConnectionV1 extends BaseConnection { "Asynchronous execution is not supported in this Firebolt version." ); } + + async executeStream( + query: string, + executeQueryOptions?: ExecuteQueryOptions + ): Promise { + throw new Error( + "Asynchronous execution is not supported in this Firebolt version." + ); + } } diff --git a/src/connection/connection_v2.ts b/src/connection/connection_v2.ts index b996b9c4..f99edba8 100644 --- a/src/connection/connection_v2.ts +++ b/src/connection/connection_v2.ts @@ -3,8 +3,9 @@ import { ACCOUNT_SYSTEM_ENGINE, QUERY_URL } from "../common/api"; import { Connection as BaseConnection } from "./base"; import { Cache, inMemoryCache, noneCache } from "../common/tokenCache"; -import { ExecuteQueryOptions } from "../types"; +import { ExecuteQueryOptions, OutputFormat } from "../types"; import { AsyncStatement } from "../statement/async"; +import { StreamStatement } from "../statement/stream"; export class ConnectionV2 extends BaseConnection { private get account(): string { @@ -103,6 +104,24 @@ export class ConnectionV2 extends BaseConnection { }); } + async executeStream( + query: string, + executeQueryOptions: ExecuteQueryOptions = {} + ): Promise { + const { response } = await this.prepareAndExecuteQuery(query, { + ...executeQueryOptions, + settings: { + ...executeQueryOptions?.settings, + output_format: OutputFormat.JSON_LINES + } + }); + + return new StreamStatement({ + response, + executeQueryOptions + }); + } + private async getAsyncQueryInfo(token: string) { const query = `CALL fb_GetAsyncStatus('${token}')`; diff --git a/src/statement/normalizeResponse.ts b/src/statement/normalizeResponse.ts index 5532b96b..2798819c 100644 --- a/src/statement/normalizeResponse.ts +++ b/src/statement/normalizeResponse.ts @@ -1,7 +1,7 @@ import { - QueryResponse, - OutputFormat, ExecuteQueryOptions, + OutputFormat, + QueryResponse, Row } from "../types"; import { Meta } from "../meta"; @@ -35,11 +35,11 @@ export const normalizeColumn = (column: { name: string; type: string }) => { return new Meta(column); }; -const getNormalizedMeta = (response: ParsedResponse): Meta[] => { - if (!response.meta) { +export const getNormalizedMeta = (meta: any[]): Meta[] => { + if (!meta) { return []; } - return response.meta.map(normalizeColumn); + return meta.map(normalizeColumn); }; export const getNormalizedStatistics = (response: ParsedResponse) => { @@ -71,7 +71,7 @@ export const normalizeResponse = ( const hydrate = executeQueryOptions?.response?.hydrateRow || hydrateRow; - const meta = getNormalizedMeta(response); + const meta = getNormalizedMeta(response.meta); const statistics = getNormalizedStatistics(response); @@ -96,3 +96,21 @@ export const normalizeResponse = ( statistics }; }; + +export const normalizeResponseRowStreaming = ( + data: any[], + executeQueryOptions: ExecuteQueryOptions, + meta: Meta[] +): Row[] => { + const { response: { normalizeData = false } = {} } = executeQueryOptions; + + const hydrate = executeQueryOptions?.response?.hydrateRow || hydrateRow; + + return data.map((row: Row) => { + const hydratedRow = hydrate(row, meta, executeQueryOptions); + if (normalizeData) { + return normalizeRow(hydratedRow, meta, executeQueryOptions); + } + return hydratedRow; + }); +}; diff --git a/src/statement/stream.ts b/src/statement/stream.ts new file mode 100644 index 00000000..c0b25c12 --- /dev/null +++ b/src/statement/stream.ts @@ -0,0 +1,28 @@ +import { ExecuteQueryOptions } from "../types"; +import { ServerSideStream } from "./stream/serverSideStream"; +import { Response } from "node-fetch"; + +export class StreamStatement { + private readonly executeQueryOptions: ExecuteQueryOptions; + private readonly response: Response; + + constructor({ + response, + executeQueryOptions + }: { + response: Response; + executeQueryOptions: ExecuteQueryOptions; + }) { + this.response = response; + this.executeQueryOptions = executeQueryOptions; + } + + async streamResult() { + if (!this.response.body) { + throw new Error("Response body is not readable"); + } + return { + data: new ServerSideStream(this.response, this.executeQueryOptions) + }; + } +} diff --git a/src/statement/stream/serverSideStream.ts b/src/statement/stream/serverSideStream.ts new file mode 100644 index 00000000..b70cc2b0 --- /dev/null +++ b/src/statement/stream/serverSideStream.ts @@ -0,0 +1,65 @@ +import { Readable } from "stream"; +import JSONbig from "json-bigint"; +import readline from "readline"; +import { + getNormalizedMeta, + normalizeResponseRowStreaming +} from "../normalizeResponse"; +import { Response } from "node-fetch"; +import { ExecuteQueryOptions } from "../../types"; +import { Meta } from "../../meta"; + +export class ServerSideStream extends Readable { + private meta: Meta[] = []; + constructor( + private readonly response: Response, + private readonly executeQueryOptions: ExecuteQueryOptions + ) { + super({ objectMode: true }); + const readLine = readline.createInterface({ + input: response.body, + crlfDelay: Infinity + }); + + const lineParser = (line: string) => { + try { + if (line.trim()) { + const parsed = JSONbig.parse(line); + if (parsed) { + if (parsed.message_type === "DATA") { + if (parsed.data) { + const normalizedData = normalizeResponseRowStreaming( + parsed.data, + executeQueryOptions, + this.meta + ); + for (const data of normalizedData) { + this.emit("data", data); + } + } + } else if (parsed.message_type === "START") { + const metaDataJson = JSONbig.parse(line); + this.meta = getNormalizedMeta(metaDataJson.result_columns); + this.emit("meta", this.meta); + } else if (parsed.message_type === "FINISH_SUCCESSFULLY") { + this.push(null); + } else if (parsed.message_type === "FINISH_WITH_ERROR") { + this.destroy(new Error(`Result encountered an error: ${line}`)); + } + } else { + this.destroy(new Error(`Result row could not be parsed: ${line}`)); + } + } + } catch (err) { + this.destroy(err); + } + }; + readLine.on("line", lineParser); + + readLine.on("close", () => { + this.push(null); + }); + } + + _read() {} +} diff --git a/src/types.ts b/src/types.ts index e8b152ff..1fd3208c 100644 --- a/src/types.ts +++ b/src/types.ts @@ -26,7 +26,8 @@ export enum OutputFormat { // JSON_COMPACT_LIMITED = "FB_JSONCompactLimited", COMPACT = "JSON_Compact", // supported in v3 of packdb JSON_COMPACT = "JSONCompact", // to be as above, after ensure all clients has v3 - JSON = "JSON" + JSON = "JSON", + JSON_LINES = "JSONLines_Compact" } export type QuerySettings = Record< diff --git a/test/integration/v1/stream.test.ts b/test/integration/v1/stream.test.ts index 22000c8a..22ebd77c 100644 --- a/test/integration/v1/stream.test.ts +++ b/test/integration/v1/stream.test.ts @@ -1,53 +1,43 @@ -import stream, { TransformCallback } from "stream"; -import { Firebolt } from "../../../src/index"; +import { Firebolt } from "../../../src"; const connectionParams = { auth: { - username: process.env.FIREBOLT_USERNAME as string, - password: process.env.FIREBOLT_PASSWORD as string + client_id: process.env.FIREBOLT_CLIENT_ID as string, + client_secret: process.env.FIREBOLT_CLIENT_SECRET as string }, + account: process.env.FIREBOLT_ACCOUNT as string, database: process.env.FIREBOLT_DATABASE as string, engineName: process.env.FIREBOLT_ENGINE_NAME as string }; -jest.setTimeout(20000); +jest.setTimeout(250000); describe("streams", () => { - it("stream transformters", async () => { - class SerializeRowStream extends stream.Transform { - public constructor() { - super({ - objectMode: true, - transform( - row: any, - encoding: BufferEncoding, - callback: TransformCallback - ) { - const transformed = JSON.stringify(row); - this.push(transformed); - this.push("\n"); - callback(); - } - }); - } - } - + it("stream transformers", async () => { const firebolt = Firebolt({ apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string }); - const serializedStream = new SerializeRowStream(); - const connection = await firebolt.connect(connectionParams); - const statement = await connection.execute("select 1 union all select 2"); + const statement = await connection.executeStream( + `select 1 from generate_series(1, 250000000)` //~1 GB response + ); const { data } = await statement.streamResult(); - - data.pipe(serializedStream).pipe(process.stdout); + let sum = 0; + + data + .on("meta", meta => { + console.log("Meta:", meta); + }) + .on("data", row => { + sum += row[0]; + }); await new Promise(resolve => { data.on("end", () => { + expect(sum).toEqual(250000000); resolve(null); }); }); From a27943d1e2716fa54508f3aa87f5d82acd1e7fdf Mon Sep 17 00:00:00 2001 From: Bogdan Truta Date: Thu, 10 Apr 2025 15:09:23 +0300 Subject: [PATCH 2/9] add tests --- src/connection/connection_v1.ts | 2 +- src/statement/stream/jsonStream.ts | 71 ----------------------------- test/integration/v1/stream.test.ts | 33 ++++---------- test/integration/v2/stream.test.ts | 73 ++++++++++++++++++++---------- 4 files changed, 58 insertions(+), 121 deletions(-) delete mode 100644 src/statement/stream/jsonStream.ts diff --git a/src/connection/connection_v1.ts b/src/connection/connection_v1.ts index 89f32e7c..7a5a31e7 100644 --- a/src/connection/connection_v1.ts +++ b/src/connection/connection_v1.ts @@ -100,7 +100,7 @@ export class ConnectionV1 extends BaseConnection { executeQueryOptions?: ExecuteQueryOptions ): Promise { throw new Error( - "Asynchronous execution is not supported in this Firebolt version." + "Stream execution is not supported in this Firebolt version." ); } } diff --git a/src/statement/stream/jsonStream.ts b/src/statement/stream/jsonStream.ts deleted file mode 100644 index 657bbd73..00000000 --- a/src/statement/stream/jsonStream.ts +++ /dev/null @@ -1,71 +0,0 @@ -import JSONbig from "json-bigint"; -import { StreamOptions, ExecuteQueryOptions, RowParser } from "../../types"; -import { Meta } from "../../meta"; -import { normalizeColumn, normalizeRow } from "../normalizeResponse"; -import { hydrateRow } from "../hydrateResponse"; -import { RowStream } from "./rowStream"; -import { JSONParser } from "./parser"; - -export class JSONStream { - jsonParser: JSONParser; - options?: StreamOptions; - executeQueryOptions: ExecuteQueryOptions; - emitter: RowStream; - rowParser: RowParser; - - constructor({ - emitter, - options, - executeQueryOptions - }: { - emitter: RowStream; - options?: StreamOptions; - executeQueryOptions: ExecuteQueryOptions; - }) { - this.emitter = emitter; - this.options = options; - this.executeQueryOptions = executeQueryOptions; - this.rowParser = this.options?.rowParser || this.defaultRowParser; - - this.jsonParser = new JSONParser({ - onMetadataParsed: columns => { - this.emitter.emit("metadata", columns); - }, - hydrateRow: this.rowParser, - hydrateColumn: (columnStr: string) => { - const column = JSONbig.parse(columnStr); - return normalizeColumn(column); - } - }); - } - - defaultRowParser = (row: string, isLastRow: boolean) => { - const normalizeData = this.executeQueryOptions.response?.normalizeData; - const parsed = JSONbig.parse(row); - const hydrate = this.executeQueryOptions.response?.hydrateRow || hydrateRow; - const result = this.getResult(0); - const columns = result.columns; - const hydratedRow = hydrate( - parsed, - columns as Meta[], - this.executeQueryOptions - ); - if (normalizeData) { - const normalizedRow = normalizeRow( - hydratedRow, - columns as Meta[], - this.executeQueryOptions - ); - return normalizedRow; - } - return hydratedRow; - }; - - processLine(line: string) { - this.jsonParser.processLine(line); - } - - getResult(index: number) { - return this.jsonParser.results[index]; - } -} diff --git a/test/integration/v1/stream.test.ts b/test/integration/v1/stream.test.ts index 22ebd77c..7f667fd7 100644 --- a/test/integration/v1/stream.test.ts +++ b/test/integration/v1/stream.test.ts @@ -2,15 +2,14 @@ import { Firebolt } from "../../../src"; const connectionParams = { auth: { - client_id: process.env.FIREBOLT_CLIENT_ID as string, - client_secret: process.env.FIREBOLT_CLIENT_SECRET as string + username: process.env.FIREBOLT_USERNAME as string, + password: process.env.FIREBOLT_PASSWORD as string }, - account: process.env.FIREBOLT_ACCOUNT as string, database: process.env.FIREBOLT_DATABASE as string, engineName: process.env.FIREBOLT_ENGINE_NAME as string }; -jest.setTimeout(250000); +jest.setTimeout(20000); describe("streams", () => { it("stream transformers", async () => { @@ -20,26 +19,12 @@ describe("streams", () => { const connection = await firebolt.connect(connectionParams); - const statement = await connection.executeStream( - `select 1 from generate_series(1, 250000000)` //~1 GB response + expect(() => + connection.executeStream( + `select 1 from generate_series(1, 250000000)` //~1 GB response + ) + ).rejects.toThrow( + Error("Stream execution is not supported in this Firebolt version.") ); - - const { data } = await statement.streamResult(); - let sum = 0; - - data - .on("meta", meta => { - console.log("Meta:", meta); - }) - .on("data", row => { - sum += row[0]; - }); - - await new Promise(resolve => { - data.on("end", () => { - expect(sum).toEqual(250000000); - resolve(null); - }); - }); }); }); diff --git a/test/integration/v2/stream.test.ts b/test/integration/v2/stream.test.ts index 0142ce7c..5108ba3d 100644 --- a/test/integration/v2/stream.test.ts +++ b/test/integration/v2/stream.test.ts @@ -1,5 +1,4 @@ -import stream, { TransformCallback } from "stream"; -import { Firebolt } from "../../../src/index"; +import { Firebolt } from "../../../src"; const connectionParams = { auth: { @@ -11,44 +10,68 @@ const connectionParams = { engineName: process.env.FIREBOLT_ENGINE_NAME as string }; -jest.setTimeout(40000); +jest.setTimeout(250000); describe("streams", () => { - it("stream transformters", async () => { - class SerializeRowStream extends stream.Transform { - public constructor() { - super({ - objectMode: true, - transform( - row: any, - encoding: BufferEncoding, - callback: TransformCallback - ) { - const transformed = JSON.stringify(row); - this.push(transformed); - this.push("\n"); - callback(); - } - }); - } - } - + it("check sum from stream result", async () => { const firebolt = Firebolt({ apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string }); - const serializedStream = new SerializeRowStream(); + const connection = await firebolt.connect(connectionParams); + + const statement = await connection.executeStream( + `select 1 from generate_series(1, 2500000)` //~1 GB response + ); + + const { data } = await statement.streamResult(); + let sum = 0; + + data + .on("meta", meta => { + console.log("Meta:", meta); + }) + .on("data", row => { + sum += row[0]; + }); + + await new Promise(resolve => { + data.on("end", () => { + expect(sum).toEqual(2500000); + resolve(null); + }); + }); + }); + it("check normalized data", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); const connection = await firebolt.connect(connectionParams); - const statement = await connection.execute("select 1 union all select 2"); + const statement = await connection.executeStream( + `select 1 from generate_series(1, 250000)`, //~1 GB response + { + response: { + normalizeData: true + } + } + ); const { data } = await statement.streamResult(); + let sum = 0; - data.pipe(serializedStream).pipe(process.stdout); + data + .on("meta", meta => { + console.log("Meta:", meta); + }) + .on("data", row => { + sum += row["?column?"]; + }); await new Promise(resolve => { data.on("end", () => { + expect(sum).toEqual(250000); resolve(null); }); }); From f3b02b42144c7911b15dac24ee18df4e439042c9 Mon Sep 17 00:00:00 2001 From: Bogdan Truta Date: Thu, 10 Apr 2025 16:50:34 +0300 Subject: [PATCH 3/9] add unit tests --- src/statement/stream.ts | 3 - test/unit/v2/connection.test.ts | 228 +++++++++++++++++++++++++++++++- 2 files changed, 226 insertions(+), 5 deletions(-) diff --git a/src/statement/stream.ts b/src/statement/stream.ts index c0b25c12..25d10254 100644 --- a/src/statement/stream.ts +++ b/src/statement/stream.ts @@ -18,9 +18,6 @@ export class StreamStatement { } async streamResult() { - if (!this.response.body) { - throw new Error("Response body is not readable"); - } return { data: new ServerSideStream(this.response, this.executeQueryOptions) }; diff --git a/test/unit/v2/connection.test.ts b/test/unit/v2/connection.test.ts index 58774cf2..21b47c16 100644 --- a/test/unit/v2/connection.test.ts +++ b/test/unit/v2/connection.test.ts @@ -400,12 +400,14 @@ describe("Connection V2", () => { setupMockServer(server); server.use( rest.post( - `https://some_system_engine.com/${QUERY_URL}?output_format=JSON_Compact`, + `https://some_system_engine.com/${QUERY_URL}`, async (req, res, ctx) => { const body = await req.text(); + const urlParams = Object.fromEntries(req.url.searchParams.entries()); if ( body.includes("fb_GetAsyncStatus") && - body.includes("async_query_token") + body.includes("async_query_token") && + urlParams["output_format"] === "JSON_Compact" ) { return res( ctx.json({ @@ -445,4 +447,226 @@ describe("Connection V2", () => { await new Promise(resolve => setTimeout(resolve, 100)); // somehow we need it to wait for the flag switch expect(cancelQueryExecuted).toBe(true); }); + + it("streaming works as expected", async () => { + const firebolt = Firebolt({ + apiEndpoint + }); + const jsonLines = [ + JSON.stringify({ + message_type: "START", + result_columns: [ + { + name: "?column?", + type: "integer" + } + ] + }), + JSON.stringify({ + message_type: "DATA", + data: [[1], [1]] + }), + JSON.stringify({ + message_type: "FINISH_SUCCESSFULLY" + }) + ].join("\n"); + + setupMockServer(server); + server.use( + rest.post( + `https://some_system_engine.com/${QUERY_URL}`, + async (req, res, ctx) => { + const body = await req.text(); + const urlParams = Object.fromEntries(req.url.searchParams.entries()); + if ( + body.includes("select") && + body.includes("generate_series") && + urlParams["output_format"] === "JSONLines_Compact" + ) { + return res(ctx.body(jsonLines)); + } + } + ) + ); + + const connectionParams: ConnectionOptions = { + auth: { + client_id: "dummy", + client_secret: "dummy" + }, + account: "my_account" + }; + + const connection = await firebolt.connect(connectionParams); + const streamStatement = await connection.executeStream( + "select 1 from generate_series(1, 2))" + ); + let rowCount = 0; + const { data } = await streamStatement.streamResult(); + data + .on("meta", meta => { + expect(meta).toEqual([ + { + name: "?column?", + type: "integer" + } + ]); + }) + .on("data", row => { + expect(row).toEqual([1]); + rowCount++; + }) + .on("end", () => { + expect(rowCount).toBe(2); + }); + }); + + it("streaming with normalization works as expected", async () => { + const firebolt = Firebolt({ + apiEndpoint + }); + const jsonLines = [ + JSON.stringify({ + message_type: "START", + result_columns: [ + { + name: "?column?", + type: "integer" + } + ] + }), + JSON.stringify({ + message_type: "DATA", + data: [[1], [1]] + }), + JSON.stringify({ + message_type: "FINISH_SUCCESSFULLY" + }) + ].join("\n"); + + setupMockServer(server); + server.use( + rest.post( + `https://some_system_engine.com/${QUERY_URL}`, + async (req, res, ctx) => { + const body = await req.text(); + const urlParams = Object.fromEntries(req.url.searchParams.entries()); + if ( + body.includes("select") && + body.includes("generate_series") && + urlParams["output_format"] === "JSONLines_Compact" + ) { + return res(ctx.body(jsonLines)); + } + } + ) + ); + + const connectionParams: ConnectionOptions = { + auth: { + client_id: "dummy", + client_secret: "dummy" + }, + account: "my_account" + }; + + const connection = await firebolt.connect(connectionParams); + const streamStatement = await connection.executeStream( + "select 1 from generate_series(1, 2))", + { + response: { + normalizeData: true + } + } + ); + let rowCount = 0; + const { data } = await streamStatement.streamResult(); + data + .on("meta", meta => { + expect(meta).toEqual([ + { + name: "?column?", + type: "integer" + } + ]); + }) + .on("data", row => { + expect(row).toEqual({ "?column?": 1 }); + rowCount++; + }) + .on("end", () => { + expect(rowCount).toBe(2); + }); + }); + + it("streaming fails with error", async () => { + const firebolt = Firebolt({ + apiEndpoint + }); + const jsonLines = [ + JSON.stringify({ + message_type: "START", + result_columns: [ + { + name: "?column?", + type: "integer" + } + ] + }), + JSON.stringify({ + message_type: "FINISH_WITH_ERROR" + }) + ].join("\n"); + + setupMockServer(server); + server.use( + rest.post( + `https://some_system_engine.com/${QUERY_URL}`, + async (req, res, ctx) => { + const body = await req.text(); + const urlParams = Object.fromEntries(req.url.searchParams.entries()); + if ( + body.includes("select") && + body.includes("generate_series") && + urlParams["output_format"] === "JSONLines_Compact" + ) { + return res(ctx.body(jsonLines)); + } + } + ) + ); + + const connectionParams: ConnectionOptions = { + auth: { + client_id: "dummy", + client_secret: "dummy" + }, + account: "my_account" + }; + + const connection = await firebolt.connect(connectionParams); + const streamStatement = await connection.executeStream( + "select 1 from generate_series(1, 2))" + ); + const { data } = await streamStatement.streamResult(); + data + .on("meta", meta => { + expect(meta).toEqual([ + { + name: "?column?", + type: "integer" + } + ]); + }) + .on("data", row => { + fail('"Data should not be emitted"'); + }) + .on("error", error => { + expect(error).toEqual( + new Error( + 'Result encountered an error: {"message_type":"FINISH_WITH_ERROR"}' + ) + ); + }); + }); }); From c2fba3f0645379f4241629eedb20102ed09d9d6e Mon Sep 17 00:00:00 2001 From: Bogdan Truta Date: Tue, 15 Apr 2025 23:16:33 +0300 Subject: [PATCH 4/9] adress CR --- README.md | 66 ++++--- src/index.ts | 2 - src/statement/stream/parser.test.ts | 95 ---------- src/statement/stream/parser.ts | 213 ----------------------- src/statement/stream/serverSideStream.ts | 13 +- test/integration/v2/stream.test.ts | 92 +++++++++- test/unit/v2/connection.test.ts | 33 ++-- 7 files changed, 158 insertions(+), 356 deletions(-) delete mode 100644 src/statement/stream/parser.test.ts delete mode 100644 src/statement/stream/parser.ts diff --git a/README.md b/README.md index 2906e579..20ddc754 100644 --- a/README.md +++ b/README.md @@ -617,37 +617,38 @@ await database.delete(); ### Streaming results -The recommended way to consume query results is by using streams. +Streaming can only be used with a v2 connection and only using the execute stream method. +The streamResult method from normal the normal execute method returns an in-memory stream of result, rather than dynamically fetching them from the response. -For convenience, `statement.streamResult` also returns `meta: Promise` and `statistics: Promise`, which are wrappers over `data.on('metadata')` and `data.on('statistics')`. +The recommended way to consume query results is by using streams with standard events: +* `data.on('meta')` +* `data.on('data')` +* `data.on('end')` +* `data.on('error')` ```typescript const firebolt = Firebolt(); const connection = await firebolt.connect(connectionParams); -const statement = await connection.execute("SELECT 1"); +const statement = await connection.executeStream("SELECT 1"); -const { - data, - meta: metaPromise, - statistics: statisticsPromise -} = await statement.streamResult(); +const { data } = await statement.streamResult(); const rows: unknown[] = []; -const meta = await metaPromise; - -for await (const row of data) { - rows.push(row); -} +const meta = await stream.once(data, "meta"); -const statistics = await statisticsPromise +data.on("data", data => {; + rows.push(data); +}); console.log(meta); -console.log(statistics); console.log(rows) ``` +In case an errors occurs before streaming, or during the first packet, the error will be thrown by the executeStream method. If the error occurs during streaming, it will be emitted by the stream. + +```typescript ### Custom stream transformers @@ -679,7 +680,7 @@ const serializedStream = new SerializeRowStream() const firebolt = Firebolt(); const connection = await firebolt.connect(connectionParams); -const statement = await connection.execute("select 1 union all select 2"); +const statement = await connection.executeStream("select * from generate_series(1, 1000)"); const { data } = await statement.streamResult(); @@ -687,14 +688,37 @@ const { data } = await statement.streamResult(); data.pipe(serializedStream).pipe(process.stdout); ``` -Or use `rowParser` that returns strings or Buffer: + +### In-memory stream + +When using the streamResult method on the object returned from a simple execute method, the driver will return an in-memory stream of the result. This is useful for small result sets, but not recommended for large result sets. ```typescript -const { data } = await statement.streamResult({ - rowParser: (row: string) => `${row}\n` -}); +const firebolt = Firebolt(); + +const connection = await firebolt.connect(connectionParams); -data.pipe(process.stdout); +const statement = await connection.execute("SELECT 1"); + +const { + data, + meta: metaPromise, + statistics: statisticsPromise +} = await statement.streamResult(); + +const rows: unknown[] = []; + +const meta = await metaPromise; + +for await (const row of data) { + rows.push(row); +} + +const statistics = await statisticsPromise + +console.log(meta); +console.log(statistics); +console.log(rows) ``` ## Development process diff --git a/src/index.ts b/src/index.ts index 7eee8b4a..2b3ac721 100644 --- a/src/index.ts +++ b/src/index.ts @@ -38,7 +38,5 @@ export { QueryFormatter } from "./formatter/base"; -export { JSONParser } from "./statement/stream/parser"; - export type { Connection } from "./connection"; export type { Meta } from "./meta"; diff --git a/src/statement/stream/parser.test.ts b/src/statement/stream/parser.test.ts deleted file mode 100644 index 283609c8..00000000 --- a/src/statement/stream/parser.test.ts +++ /dev/null @@ -1,95 +0,0 @@ -import { JSONParser } from "./parser"; - -const body1 = ` -{ - "query": - { - "query_id": "50cd4109-02de-4e19-b995-5e71a5f16fb7" - }, - "meta": - [ - { - "name": "engine_name", - "type": "text" - } - ], - - "data": - [ - ["peacekeeper_ns_2023_01_20_08_06_45_153_create_engine_test"], - ["Aymeric_test_2_Analytics"], - ["integration_testing_windowslatest_37_1675858870"], - ["test_4"], - ["peacekeeper_ns_2023_01_21_11_03_09_210_CREATE_ENGINE_TEST_2_3"] - ], - - "rows": 5, - - "rows_before_limit_at_least": 233, - - "statistics": - { - "elapsed": 0.42408089, - "rows_read": 233, - "bytes_read": 30680, - "time_before_execution": 0.000588018, - "time_to_execute": 0.423289816, - "scanned_bytes_cache": 0, - "scanned_bytes_storage": 0 - } -} -`; -const body2 = ` -{ -"query": -{ -"query_id": "50cd4109-02de-4e19-b995-5e71a5f16fb9" -}, -"meta": -[ -{ -"name": "engine_name", -"type": "text" -} -], - -"data": -[ -["peacekeeper_ns_2023_01_20_08_06_45_153_create_engine_test"], -["Aymeric_test_2_Analytics"], -["integration_testing_windowslatest_37_1675858870"] -], - -"rows": 3, - -"rows_before_limit_at_least": 233, - -"statistics": -{ -"elapsed": 0.42408089, -"rows_read": 233, -"bytes_read": 30680, -"time_before_execution": 0.000588018, -"time_to_execute": 0.423289816, -"scanned_bytes_cache": 0, -"scanned_bytes_storage": 0 -} -} -`; -describe("parser", () => { - it("handles single reponse", () => { - const parser = new JSONParser({}); - parser.processBody(body1); - expect(parser.results[0].rows).toHaveLength(5); - expect(parser.results[0].columns).toHaveLength(1); - }); - it("handles multi response", () => { - const parser = new JSONParser({}); - parser.processBody(` -${body1} -${body2}`); - expect(parser.results[0].rows).toHaveLength(5); - expect(parser.results[1].rows).toHaveLength(3); - expect(parser.results[1].columns).toHaveLength(1); - }); -}); diff --git a/src/statement/stream/parser.ts b/src/statement/stream/parser.ts deleted file mode 100644 index 31a2a6e2..00000000 --- a/src/statement/stream/parser.ts +++ /dev/null @@ -1,213 +0,0 @@ -import JSONbig from "json-bigint"; - -export class JSONParser { - state: - | "meta" - | "meta-array" - | "rootKeys" - | "data" - | "data-array" - | "query" - | "query-object" - | "statistics-object" - | null; - - onMetadataParsed; - hydrateColumn; - hydrateRow; - - objBuffer?: string; - currentIndex: number; - - results: { - rows: unknown[]; - columns: unknown[]; - statistics: any; - query?: any; - }[]; - - constructor({ - onMetadataParsed = (columns: any) => {}, - hydrateColumn = (column: string) => JSONbig.parse(column), - hydrateRow = (row: string, isLastRow: boolean): any => JSONbig.parse(row) - }) { - this.onMetadataParsed = onMetadataParsed; - this.hydrateColumn = hydrateColumn; - this.hydrateRow = hydrateRow; - - this.results = []; - - this.state = null; - this.currentIndex = 0; - } - - fillEmptyResult() { - this.results[this.currentIndex] = { - columns: [], - rows: [], - statistics: {} - }; - } - - pushColumn(column: unknown) { - this.results[this.currentIndex].columns.push(column); - } - - pushRow(row: unknown) { - this.results[this.currentIndex].rows.push(row); - } - - pushQuery(query: any) { - this.results[this.currentIndex].query = query; - } - - pushStatistics(statistics: any) { - this.results[this.currentIndex].statistics = statistics; - } - - handleRoot(line: string) { - if (line === "{") { - this.state = "rootKeys"; - if (this.results.length > 0) { - this.currentIndex += 1; - } - this.fillEmptyResult(); - } - } - - handleRootKeys(line: string) { - if (line === '"query":') { - this.state = "query"; - } else if (line === '"query": {') { - this.objBuffer = "{"; - this.state = "query-object"; - } else if (line === '"meta":') { - this.state = "meta"; - } else if (line === '"data":') { - this.state = "data"; - } else if (line === '"meta": [') { - this.state = "meta-array"; - } else if (line === '"data": [') { - this.state = "data-array"; - } else if (line === '"statistics":') { - this.objBuffer = ""; - this.state = "statistics-object"; - } else if (line === "}" || line === "},") { - this.state = null; - } - } - - handleMeta(line: string) { - if (line === "[") { - this.state = "meta-array"; - } - } - - handleMetaArray(line: string) { - if (line.match(/^},?$/)) { - const columnStr = this.objBuffer + "}"; - const column = this.hydrateColumn(columnStr); - this.pushColumn(column); - this.objBuffer = undefined; - } else if (line === "{") { - this.objBuffer = line; - } else if (line.match(/^],?$/)) { - this.onMetadataParsed(this.results[this.currentIndex].columns); - this.state = "rootKeys"; - } else { - this.objBuffer += line; - } - } - - handleDataArray(line: string) { - if (line.match(/^[\]}],?$/) && this.objBuffer) { - const rowStr = this.objBuffer + line[0]; - const row = this.hydrateRow(rowStr, false); - this.pushRow(row); - this.objBuffer = undefined; - } else if (line === "{" || line === "[") { - this.objBuffer = line; - } else if (line.match(/^],?$/)) { - this.state = "rootKeys"; - } else if (this.objBuffer === undefined) { - const isLastRow = line[line.length - 1] !== ","; - const rowStr = isLastRow ? line : line.substring(0, line.length - 1); - const row = this.hydrateRow(rowStr, isLastRow); - this.pushRow(row); - } else { - this.objBuffer += line; - } - } - - handleData(line: string) { - if (line === "[") { - this.state = "data-array"; - } - } - - handleQuery(line: string) { - if (line === "{") { - this.objBuffer = "{"; - this.state = "query-object"; - } - } - - handleQueryObject(line: string) { - if (line.match(/^},?$/)) { - const queryStr = this.objBuffer + "}"; - const query = JSONbig.parse(queryStr); - this.objBuffer = undefined; - this.state = "rootKeys"; - this.pushQuery(query); - } else { - this.objBuffer += line; - } - } - - handleStatisticsObject(line: string) { - if (line.match(/^},?$/)) { - const queryStr = this.objBuffer + "}"; - const statistics = JSONbig.parse(queryStr); - this.objBuffer = undefined; - this.state = "rootKeys"; - this.pushStatistics(statistics); - } else { - this.objBuffer += line; - } - } - - processLine(line: string) { - line = line.trim(); - - if (!line.length) { - return; - } - - if (this.state === null) { - this.handleRoot(line); - } else if (this.state === "rootKeys") { - this.handleRootKeys(line); - } else if (this.state === "meta") { - this.handleMeta(line); - } else if (this.state === "data") { - this.handleData(line); - } else if (this.state === "meta-array") { - this.handleMetaArray(line); - } else if (this.state === "data-array") { - this.handleDataArray(line); - } else if (this.state === "query") { - this.handleQuery(line); - } else if (this.state === "query-object") { - this.handleQueryObject(line); - } else if (this.state === "statistics-object") { - this.handleStatisticsObject(line); - } - } - - processBody(body: string) { - const lines = body.split("\n"); - for (const line of lines) { - this.processLine(line); - } - } -} diff --git a/src/statement/stream/serverSideStream.ts b/src/statement/stream/serverSideStream.ts index b70cc2b0..6728478b 100644 --- a/src/statement/stream/serverSideStream.ts +++ b/src/statement/stream/serverSideStream.ts @@ -38,13 +38,18 @@ export class ServerSideStream extends Readable { } } } else if (parsed.message_type === "START") { - const metaDataJson = JSONbig.parse(line); - this.meta = getNormalizedMeta(metaDataJson.result_columns); + this.meta = getNormalizedMeta(parsed.result_columns); this.emit("meta", this.meta); } else if (parsed.message_type === "FINISH_SUCCESSFULLY") { this.push(null); - } else if (parsed.message_type === "FINISH_WITH_ERROR") { - this.destroy(new Error(`Result encountered an error: ${line}`)); + } else if (parsed.message_type === "FINISH_WITH_ERRORS") { + this.destroy( + new Error( + `Result encountered an error: ${parsed.errors + .map((error: { description: string }) => error.description) + .join("\n")}` + ) + ); } } else { this.destroy(new Error(`Result row could not be parsed: ${line}`)); diff --git a/test/integration/v2/stream.test.ts b/test/integration/v2/stream.test.ts index 5108ba3d..7187412f 100644 --- a/test/integration/v2/stream.test.ts +++ b/test/integration/v2/stream.test.ts @@ -1,4 +1,5 @@ import { Firebolt } from "../../../src"; +import stream, { TransformCallback } from "node:stream"; const connectionParams = { auth: { @@ -26,14 +27,12 @@ describe("streams", () => { const { data } = await statement.streamResult(); let sum = 0; + const meta = await stream.once(data, "meta"); + console.log("Meta:", meta); - data - .on("meta", meta => { - console.log("Meta:", meta); - }) - .on("data", row => { - sum += row[0]; - }); + data.on("data", row => { + sum += row[0]; + }); await new Promise(resolve => { data.on("end", () => { @@ -76,4 +75,83 @@ describe("streams", () => { }); }); }); + it("stream transformers", async () => { + class SerializeRowStream extends stream.Transform { + public constructor() { + super({ + objectMode: true, + transform( + row: any, + encoding: BufferEncoding, + callback: TransformCallback + ) { + const transformed = JSON.stringify(row); + this.push(transformed); + this.push("\n"); + callback(); + } + }); + } + } + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + + const serializedStream = new SerializeRowStream(); + const statement = await connection.executeStream( + "select * from generate_series(1, 10)" + ); + + const { data } = await statement.streamResult(); + data.pipe(serializedStream).pipe(process.stdout); + + await new Promise(resolve => { + data.on("end", () => { + resolve(null); + }); + }); + }); + it("stream syntax error", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + + try { + await connection.executeStream("select *1;"); + } catch (error) { + expect(error.message).toContain( + "Line 1, Column 9: syntax error, unexpected integer, expecting end of file" + ); + expect(error.message).toContain("select *1;"); + } + }); + it("stream division by 0 error", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + + const statement = await connection.executeStream( + "select 1/(i-100000) as a from generate_series(1,100000) as i" + ); + + // let errorThrown = false; + const { data } = await statement.streamResult(); + data.on("meta", meta => { + expect(meta).toEqual([ + { + name: "a", + type: "integer" + } + ]); + }); + const [error] = await stream.once(data, "error"); + expect(error.message).toEqual( + "Result encountered an error: Line 1, Column 9: Division by zero\n" + + "select 1/(i-100000) as a from generate_series(1,...\n" + + " ^" + ); + }); }); diff --git a/test/unit/v2/connection.test.ts b/test/unit/v2/connection.test.ts index 21b47c16..c4cd530c 100644 --- a/test/unit/v2/connection.test.ts +++ b/test/unit/v2/connection.test.ts @@ -4,6 +4,7 @@ import { Firebolt } from "../../../src"; import { ConnectionOptions } from "../../../src/types"; import { QUERY_URL } from "../../../src/common/api"; import { inMemoryCache } from "../../../src/common/tokenCache"; +import stream from "node:stream"; const apiEndpoint = "api.fake.firebolt.io"; @@ -614,7 +615,17 @@ describe("Connection V2", () => { ] }), JSON.stringify({ - message_type: "FINISH_WITH_ERROR" + message_type: "FINISH_WITH_ERRORS", + errors: [ + { + description: + "Line 1, Column 9: syntax error, unexpected identifier, expecting end of file select *1;", + location: { + failing_line: 1, + start_offset: 9 + } + } + ] }) ].join("\n"); @@ -626,8 +637,7 @@ describe("Connection V2", () => { const body = await req.text(); const urlParams = Object.fromEntries(req.url.searchParams.entries()); if ( - body.includes("select") && - body.includes("generate_series") && + body.includes("select *1;") && urlParams["output_format"] === "JSONLines_Compact" ) { return res(ctx.body(jsonLines)); @@ -645,9 +655,7 @@ describe("Connection V2", () => { }; const connection = await firebolt.connect(connectionParams); - const streamStatement = await connection.executeStream( - "select 1 from generate_series(1, 2))" - ); + const streamStatement = await connection.executeStream("select *1;"); const { data } = await streamStatement.streamResult(); data .on("meta", meta => { @@ -658,15 +666,12 @@ describe("Connection V2", () => { } ]); }) - .on("data", row => { + .on("data", () => { fail('"Data should not be emitted"'); - }) - .on("error", error => { - expect(error).toEqual( - new Error( - 'Result encountered an error: {"message_type":"FINISH_WITH_ERROR"}' - ) - ); }); + const [error] = await stream.once(data, "error"); + expect(error.message).toEqual( + "Result encountered an error: Line 1, Column 9: syntax error, unexpected identifier, expecting end of file select *1;" + ); }); }); From f6679022ba71e6e05271709feef8f764a40612ee Mon Sep 17 00:00:00 2001 From: Bogdan Truta Date: Wed, 16 Apr 2025 11:02:49 +0300 Subject: [PATCH 5/9] add details to readme --- README.md | 17 +++++++++++++++++ test/integration/v2/stream.test.ts | 1 - 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 20ddc754..cdbf2abd 100644 --- a/README.md +++ b/README.md @@ -647,7 +647,24 @@ console.log(meta); console.log(rows) ``` In case an errors occurs before streaming, or during the first packet, the error will be thrown by the executeStream method. If the error occurs during streaming, it will be emitted by the stream. +```typescript +try { + await connection.executeStream("select *1;"); +} catch (error) { + //error is thrown directly since this is a syntax error +} + +const statement = await connection.executeStream( + "select 1/(i-100000) as a from generate_series(1,100000) as i" +); +const { data } = await statement.streamResult(); +data.on("error", error => { + //error is emitted by the stream after first chunk of results + console.log(error); +}); + +``` ```typescript diff --git a/test/integration/v2/stream.test.ts b/test/integration/v2/stream.test.ts index 7187412f..8f24d8fb 100644 --- a/test/integration/v2/stream.test.ts +++ b/test/integration/v2/stream.test.ts @@ -137,7 +137,6 @@ describe("streams", () => { "select 1/(i-100000) as a from generate_series(1,100000) as i" ); - // let errorThrown = false; const { data } = await statement.streamResult(); data.on("meta", meta => { expect(meta).toEqual([ From 0397b9850f2ae8a877dce43d51039761a436a860 Mon Sep 17 00:00:00 2001 From: Bogdan Truta Date: Thu, 17 Apr 2025 23:44:56 +0300 Subject: [PATCH 6/9] address CR --- README.md | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index cdbf2abd..fd7d0a68 100644 --- a/README.md +++ b/README.md @@ -617,8 +617,8 @@ await database.delete(); ### Streaming results -Streaming can only be used with a v2 connection and only using the execute stream method. -The streamResult method from normal the normal execute method returns an in-memory stream of result, rather than dynamically fetching them from the response. +Streaming can only be used with a v2 connection and only using the `executeStream` method. +The streamResult method for the normal `execute` method returns an in-memory stream of result, rather than dynamically fetching them from the response. This is further explained in the [in-memory stream](#in-memory-stream) section. The recommended way to consume query results is by using streams with standard events: * `data.on('meta')` @@ -665,7 +665,7 @@ data.on("error", error => { }); ``` -```typescript + ### Custom stream transformers @@ -708,7 +708,11 @@ data.pipe(serializedStream).pipe(process.stdout); ### In-memory stream -When using the streamResult method on the object returned from a simple execute method, the driver will return an in-memory stream of the result. This is useful for small result sets, but not recommended for large result sets. +When using the streamResult method on the object returned from a simple execute method, the driver will return +an in-memory stream of the result. This is useful for small result sets, but not recommended for large result sets. + +In this case the whole result will be first fetched in memory and then made available via streamResult. +This is done for compatibility reasons and has no performance benefits compared to using fetchResult ```typescript const firebolt = Firebolt(); From ea2c2e6dc665f4fabad1bbb3414d0333e119988f Mon Sep 17 00:00:00 2001 From: Bogdan Truta Date: Wed, 23 Apr 2025 14:14:44 +0300 Subject: [PATCH 7/9] add type tests --- src/statement/dataTypes.ts | 8 +- test/integration/v2/fetchTypes.test.ts | 152 ++++++++++++++++++++++++- 2 files changed, 157 insertions(+), 3 deletions(-) diff --git a/src/statement/dataTypes.ts b/src/statement/dataTypes.ts index d4252703..a1001c6d 100644 --- a/src/statement/dataTypes.ts +++ b/src/statement/dataTypes.ts @@ -11,13 +11,16 @@ const typeMapping = { decimal: "decimal", decimal_ext: "decimal", long: "long", + bigint: "long", float: "float", + real: "float", float32: "float", float64: "double", double: "double", double32: "double", double64: "double", - integer: "integer", + "double precision": "double", + integer: "int", int: "int", uint: "int", int8: "int", @@ -42,7 +45,7 @@ const getMappedType = (innerType: string) => { ) { return typeMapping.timestamp; } - if (innerType.match(/decimal(.+)/i)) { + if (innerType.match(/decimal(.+)/i) || innerType.match(/numeric(.+)/i)) { return typeMapping.decimal; } }; @@ -83,6 +86,7 @@ export const STRING_TYPES = withNullableTypes(["string", "text"]); export const BYTEA_TYPES = withNullableTypes(["bytea"]); +//todo fix nullable types FIR-45354 export const getFireboltType = (type: string): string => { const key = type.toLowerCase(); const match = key.match(COMPLEX_TYPE); diff --git a/test/integration/v2/fetchTypes.test.ts b/test/integration/v2/fetchTypes.test.ts index 71638117..afa8f156 100644 --- a/test/integration/v2/fetchTypes.test.ts +++ b/test/integration/v2/fetchTypes.test.ts @@ -1,5 +1,6 @@ -import { Firebolt } from "../../../src/index"; +import { Firebolt } from "../../../src"; import BigNumber from "bignumber.js"; +import stream from "node:stream"; const connectionParams = { auth: { @@ -158,4 +159,153 @@ describe("test type casting on fetch", () => { await connection.execute("DROP TABLE IF EXISTS test_struct_helper"); } }); + //todo fix nullable types FIR-45354 + it("select all types", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + + const connection = await firebolt.connect({ + ...connectionParams, + engineName: process.env.FIREBOLT_ENGINE_NAME as string + }); + const statement = await connection.execute( + "select 1 as col_int,\n" + + " null::int as col_int_null,\n" + + " 30000000000 as col_long,\n" + + " null::bigint as col_long_null,\n" + + " 1.23::float4 as col_float,\n" + + " null::float4 as col_float_null,\n" + + " 1.23456789012 as col_double,\n" + + " null::double as col_double_null,\n" + + " 'text' as col_text,\n" + + " null::text as col_text_null,\n" + + " '2021-03-28'::date as col_date,\n" + + " null::date as col_date_null,\n" + + " '2019-07-31 01:01:01'::timestamp as col_timestamp,\n" + + " null::timestamp as col_timestamp_null,\n" + + " '1111-01-05 17:04:42.123456'::timestamptz as col_timestamptz,\n" + + " null::timestamptz as col_timestamptz_null,\n" + + " true as col_boolean,\n" + + " null::bool as col_boolean_null,\n" + + " [1,2,3,4] as col_array,\n" + + // " null::array(int) as col_array_null,\n" + + " '1231232.123459999990457054844258706536'::decimal(38, 30) as col_decimal,\n" + + // " null::decimal(38, 30) as col_decimal_null,\n" + + " 'abc123'::bytea as col_bytea,\n" + + " null::bytea as col_bytea_null,\n" + + " 'point(1 2)'::geography as col_geography,\n" + + " null::geography as col_geography_null," + ); + const { data, meta } = await statement.fetchResult(); + const metaObjects = [ + { name: "col_int", type: "int" }, + { name: "col_int_null", type: "int null" }, + { name: "col_long", type: "long" }, + { name: "col_long_null", type: "long null" }, + { name: "col_float", type: "float" }, + { name: "col_float_null", type: "float null" }, + { name: "col_double", type: "double" }, + { name: "col_double_null", type: "double null" }, + { name: "col_text", type: "text" }, + { name: "col_text_null", type: "text null" }, + { name: "col_date", type: "date" }, + { name: "col_date_null", type: "date null" }, + { name: "col_timestamp", type: "timestamp" }, + { name: "col_timestamp_null", type: "timestamp null" }, + { name: "col_timestamptz", type: "timestamptz" }, + { name: "col_timestamptz_null", type: "timestamptz null" }, + { name: "col_boolean", type: "boolean" }, + { name: "col_boolean_null", type: "boolean null" }, + { name: "col_array", type: "array(int)" }, + // { name: "col_array_null", type: "array(int) null" }, + // { name: "col_decimal", type: "decimal(38, 30)" }, + { name: "col_decimal", type: "decimal" }, + // { name: "col_decimal_null", type: "decimal(38, 30) null" }, + { name: "col_bytea", type: "bytea" }, + { name: "col_bytea_null", type: "bytea null" }, + { name: "col_geography", type: "geography" }, + { name: "col_geography_null", type: "geography null" } + ]; + for (let i = 0; i < meta.length; i++) { + expect(meta[i]).toEqual(metaObjects[i]); + } + }); + //todo fix nullable types FIR-45354 + it("select all types in streaming", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + + const connection = await firebolt.connect({ + ...connectionParams, + engineName: process.env.FIREBOLT_ENGINE_NAME as string + }); + const statement = await connection.executeStream( + "select 1 as col_int,\n" + + " null::int as col_int_null,\n" + + " 30000000000 as col_long,\n" + + " null::bigint as col_long_null,\n" + + " 1.23::float4 as col_float,\n" + + " null::float4 as col_float_null,\n" + + " 1.23456789012 as col_double,\n" + + " null::double as col_double_null,\n" + + " 'text' as col_text,\n" + + " null::text as col_text_null,\n" + + " '2021-03-28'::date as col_date,\n" + + " null::date as col_date_null,\n" + + " '2019-07-31 01:01:01'::timestamp as col_timestamp,\n" + + " null::timestamp as col_timestamp_null,\n" + + " '1111-01-05 17:04:42.123456'::timestamptz as col_timestamptz,\n" + + " null::timestamptz as col_timestamptz_null,\n" + + " true as col_boolean,\n" + + " null::bool as col_boolean_null,\n" + + " [1,2,3,4] as col_array,\n" + + // " null::array(int) as col_array_null,\n" + + " '1231232.123459999990457054844258706536'::decimal(38, 30) as col_decimal,\n" + + // " null::decimal(38, 30) as col_decimal_null,\n" + + " 'abc123'::bytea as col_bytea,\n" + + " null::bytea as col_bytea_null,\n" + + " 'point(1 2)'::geography as col_geography,\n" + + " null::geography as col_geography_null," + ); + const { data } = await statement.streamResult(); + const [meta] = await stream.once(data, "meta"); + const metaObjects = [ + { name: "col_int", type: "int" }, + // { name: "col_int_null", type: "int null" }, + { name: "col_int_null", type: "integer null" }, + { name: "col_long", type: "long" }, + // { name: "col_long_null", type: "long null" }, + { name: "col_long_null", type: "bigint null" }, + { name: "col_float", type: "float" }, + // { name: "col_float_null", type: "float null" }, + { name: "col_float_null", type: "real null" }, + { name: "col_double", type: "double" }, + // { name: "col_double_null", type: "double null" }, + { name: "col_double_null", type: "double precision null" }, + { name: "col_text", type: "text" }, + { name: "col_text_null", type: "text null" }, + { name: "col_date", type: "date" }, + { name: "col_date_null", type: "date null" }, + { name: "col_timestamp", type: "timestamp" }, + { name: "col_timestamp_null", type: "timestamp null" }, + { name: "col_timestamptz", type: "timestamptz" }, + { name: "col_timestamptz_null", type: "timestamptz null" }, + { name: "col_boolean", type: "boolean" }, + { name: "col_boolean_null", type: "boolean null" }, + { name: "col_array", type: "array(int)" }, + // { name: "col_array_null", type: "array(int) null" }, + // { name: "col_decimal", type: "decimal(38, 30)" }, + { name: "col_decimal", type: "decimal" }, + // { name: "col_decimal_null", type: "decimal(38, 30) null" }, + { name: "col_bytea", type: "bytea" }, + { name: "col_bytea_null", type: "bytea null" }, + { name: "col_geography", type: "geography" }, + { name: "col_geography_null", type: "geography null" } + ]; + for (let i = 0; i < meta.length; i++) { + expect(meta[i]).toEqual(metaObjects[i]); + } + }); }); From 3b31b58d4cd12e23370e5c876975a49b34363fe3 Mon Sep 17 00:00:00 2001 From: Bogdan Truta Date: Wed, 23 Apr 2025 18:51:36 +0300 Subject: [PATCH 8/9] fix tests --- test/unit/v2/connection.test.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/unit/v2/connection.test.ts b/test/unit/v2/connection.test.ts index c4cd530c..ae45ea3c 100644 --- a/test/unit/v2/connection.test.ts +++ b/test/unit/v2/connection.test.ts @@ -509,7 +509,7 @@ describe("Connection V2", () => { expect(meta).toEqual([ { name: "?column?", - type: "integer" + type: "int" } ]); }) @@ -587,7 +587,7 @@ describe("Connection V2", () => { expect(meta).toEqual([ { name: "?column?", - type: "integer" + type: "int" } ]); }) @@ -662,7 +662,7 @@ describe("Connection V2", () => { expect(meta).toEqual([ { name: "?column?", - type: "integer" + type: "int" } ]); }) From e68041218b3c197333d6b6cf8d11fa737995b3bb Mon Sep 17 00:00:00 2001 From: Bogdan Truta Date: Thu, 24 Apr 2025 13:10:02 +0300 Subject: [PATCH 9/9] fix sonar and CR --- README.md | 2 +- src/statement/dataTypes.ts | 9 +++++--- src/statement/stream/serverSideStream.ts | 28 ++++++++++++++---------- test/integration/v2/stream.test.ts | 2 +- 4 files changed, 25 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index fd7d0a68..402f3c99 100644 --- a/README.md +++ b/README.md @@ -617,7 +617,7 @@ await database.delete(); ### Streaming results -Streaming can only be used with a v2 connection and only using the `executeStream` method. +Streaming can only be used with the `executeStream` method. The streamResult method for the normal `execute` method returns an in-memory stream of result, rather than dynamically fetching them from the response. This is further explained in the [in-memory stream](#in-memory-stream) section. The recommended way to consume query results is by using streams with standard events: diff --git a/src/statement/dataTypes.ts b/src/statement/dataTypes.ts index a1001c6d..71abe89d 100644 --- a/src/statement/dataTypes.ts +++ b/src/statement/dataTypes.ts @@ -40,12 +40,15 @@ const getMappedType = (innerType: string) => { return type; } if ( - innerType.match(/datetime64(.+)/i) || - innerType.match(/timestamp_ext(.+)/i) + RegExp(/datetime64(.+)/i).exec(innerType) || + RegExp(/timestamp_ext(.+)/i).exec(innerType) ) { return typeMapping.timestamp; } - if (innerType.match(/decimal(.+)/i) || innerType.match(/numeric(.+)/i)) { + if ( + RegExp(/decimal(.+)/i).exec(innerType) || + RegExp(/numeric(.+)/i).exec(innerType) + ) { return typeMapping.decimal; } }; diff --git a/src/statement/stream/serverSideStream.ts b/src/statement/stream/serverSideStream.ts index 6728478b..18d7aa82 100644 --- a/src/statement/stream/serverSideStream.ts +++ b/src/statement/stream/serverSideStream.ts @@ -27,16 +27,7 @@ export class ServerSideStream extends Readable { const parsed = JSONbig.parse(line); if (parsed) { if (parsed.message_type === "DATA") { - if (parsed.data) { - const normalizedData = normalizeResponseRowStreaming( - parsed.data, - executeQueryOptions, - this.meta - ); - for (const data of normalizedData) { - this.emit("data", data); - } - } + this.processData(parsed); } else if (parsed.message_type === "START") { this.meta = getNormalizedMeta(parsed.result_columns); this.emit("meta", this.meta); @@ -66,5 +57,20 @@ export class ServerSideStream extends Readable { }); } - _read() {} + private processData(parsed: { data: any[] }) { + if (parsed.data) { + const normalizedData = normalizeResponseRowStreaming( + parsed.data, + this.executeQueryOptions, + this.meta + ); + for (const data of normalizedData) { + this.emit("data", data); + } + } + } + + _read() { + /* _read method requires implementation, even if data comes from other sources */ + } } diff --git a/test/integration/v2/stream.test.ts b/test/integration/v2/stream.test.ts index 8f24d8fb..4ab00ab7 100644 --- a/test/integration/v2/stream.test.ts +++ b/test/integration/v2/stream.test.ts @@ -142,7 +142,7 @@ describe("streams", () => { expect(meta).toEqual([ { name: "a", - type: "integer" + type: "int" } ]); });