Skip to content

Commit 3c8b12f

Browse files
feat: FIR-43724 implement streaming in node sdk
1 parent 4fc08c2 commit 3c8b12f

File tree

10 files changed

+212
-38
lines changed

10 files changed

+212
-38
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ npm-debug.log*
99
.#*
1010
coverage/
1111
allure-results/
12+
.idea

package-lock.json

Lines changed: 36 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/connection/base.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import { CompositeError } from "../common/errors";
1010
import JSONbig from "json-bigint";
1111
import { QueryFormatter } from "../formatter/base";
1212
import { AsyncStatement } from "../statement/async";
13+
import { StreamStatement } from "../statement/stream";
14+
import { Response, Headers } from "node-fetch";
1315

1416
const defaultQuerySettings = {
1517
output_format: OutputFormat.COMPACT
@@ -166,6 +168,11 @@ export abstract class Connection {
166168
executeQueryOptions?: ExecuteQueryOptions
167169
): Promise<AsyncStatement>;
168170

171+
abstract executeStream(
172+
query: string,
173+
executeQueryOptions?: ExecuteQueryOptions
174+
): Promise<StreamStatement>;
175+
169176
abstract isAsyncQueryRunning(token: string): Promise<boolean>;
170177

171178
abstract isAsyncQuerySuccessful(token: string): Promise<boolean | undefined>;

src/connection/connection_v1.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { Connection as BaseConnection } from "./base";
33
import { ResourceManager } from "../service";
44
import { ExecuteQueryOptions } from "../types";
55
import { AsyncStatement } from "../statement/async";
6+
import { StreamStatement } from "../statement/stream";
67

78
export interface AccountInfo {
89
id: string;
@@ -93,4 +94,13 @@ export class ConnectionV1 extends BaseConnection {
9394
"Asynchronous execution is not supported in this Firebolt version."
9495
);
9596
}
97+
98+
async executeStream(
99+
query: string,
100+
executeQueryOptions?: ExecuteQueryOptions
101+
): Promise<StreamStatement> {
102+
throw new Error(
103+
"Asynchronous execution is not supported in this Firebolt version."
104+
);
105+
}
96106
}

src/connection/connection_v2.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ import { ACCOUNT_SYSTEM_ENGINE, QUERY_URL } from "../common/api";
33

44
import { Connection as BaseConnection } from "./base";
55
import { Cache, inMemoryCache, noneCache } from "../common/tokenCache";
6-
import { ExecuteQueryOptions } from "../types";
6+
import { ExecuteQueryOptions, OutputFormat } from "../types";
77
import { AsyncStatement } from "../statement/async";
8+
import { StreamStatement } from "../statement/stream";
89

910
export class ConnectionV2 extends BaseConnection {
1011
private get account(): string {
@@ -103,6 +104,24 @@ export class ConnectionV2 extends BaseConnection {
103104
});
104105
}
105106

107+
async executeStream(
108+
query: string,
109+
executeQueryOptions: ExecuteQueryOptions = {}
110+
): Promise<StreamStatement> {
111+
const { response } = await this.prepareAndExecuteQuery(query, {
112+
...executeQueryOptions,
113+
settings: {
114+
...executeQueryOptions?.settings,
115+
output_format: OutputFormat.JSON_LINES
116+
}
117+
});
118+
119+
return new StreamStatement({
120+
response,
121+
executeQueryOptions
122+
});
123+
}
124+
106125
private async getAsyncQueryInfo(token: string) {
107126
const query = `CALL fb_GetAsyncStatus('${token}')`;
108127

src/statement/normalizeResponse.ts

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import {
2-
QueryResponse,
3-
OutputFormat,
42
ExecuteQueryOptions,
3+
OutputFormat,
4+
QueryResponse,
55
Row
66
} from "../types";
77
import { Meta } from "../meta";
@@ -35,11 +35,11 @@ export const normalizeColumn = (column: { name: string; type: string }) => {
3535
return new Meta(column);
3636
};
3737

38-
const getNormalizedMeta = (response: ParsedResponse): Meta[] => {
39-
if (!response.meta) {
38+
export const getNormalizedMeta = (meta: any[]): Meta[] => {
39+
if (!meta) {
4040
return [];
4141
}
42-
return response.meta.map(normalizeColumn);
42+
return meta.map(normalizeColumn);
4343
};
4444

4545
export const getNormalizedStatistics = (response: ParsedResponse) => {
@@ -71,7 +71,7 @@ export const normalizeResponse = (
7171

7272
const hydrate = executeQueryOptions?.response?.hydrateRow || hydrateRow;
7373

74-
const meta = getNormalizedMeta(response);
74+
const meta = getNormalizedMeta(response.meta);
7575

7676
const statistics = getNormalizedStatistics(response);
7777

@@ -96,3 +96,21 @@ export const normalizeResponse = (
9696
statistics
9797
};
9898
};
99+
100+
export const normalizeResponseRowStreaming = (
101+
data: any[],
102+
executeQueryOptions: ExecuteQueryOptions,
103+
meta: Meta[]
104+
): Row[] => {
105+
const { response: { normalizeData = false } = {} } = executeQueryOptions;
106+
107+
const hydrate = executeQueryOptions?.response?.hydrateRow || hydrateRow;
108+
109+
return data.map((row: Row) => {
110+
const hydratedRow = hydrate(row, meta, executeQueryOptions);
111+
if (normalizeData) {
112+
return normalizeRow(hydratedRow, meta, executeQueryOptions);
113+
}
114+
return hydratedRow;
115+
});
116+
};

src/statement/stream.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { ExecuteQueryOptions } from "../types";
2+
import { ServerSideStream } from "./stream/serverSideStream";
3+
import { Response } from "node-fetch";
4+
5+
export class StreamStatement {
6+
private readonly executeQueryOptions: ExecuteQueryOptions;
7+
private readonly response: Response;
8+
9+
constructor({
10+
response,
11+
executeQueryOptions
12+
}: {
13+
response: Response;
14+
executeQueryOptions: ExecuteQueryOptions;
15+
}) {
16+
this.response = response;
17+
this.executeQueryOptions = executeQueryOptions;
18+
}
19+
20+
async streamResult() {
21+
if (!this.response.body) {
22+
throw new Error("Response body is not readable");
23+
}
24+
return {
25+
data: new ServerSideStream(this.response, this.executeQueryOptions)
26+
};
27+
}
28+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import { Readable } from "stream";
2+
import JSONbig from "json-bigint";
3+
import readline from "readline";
4+
import {
5+
getNormalizedMeta,
6+
normalizeResponseRowStreaming
7+
} from "../normalizeResponse";
8+
import { Response } from "node-fetch";
9+
import { ExecuteQueryOptions } from "../../types";
10+
import { Meta } from "../../meta";
11+
12+
export class ServerSideStream extends Readable {
13+
private meta: Meta[] = [];
14+
constructor(
15+
private readonly response: Response,
16+
private readonly executeQueryOptions: ExecuteQueryOptions
17+
) {
18+
super({ objectMode: true });
19+
const readLine = readline.createInterface({
20+
input: response.body,
21+
crlfDelay: Infinity
22+
});
23+
24+
const lineParser = (line: string) => {
25+
try {
26+
if (line.trim()) {
27+
const parsed = JSONbig.parse(line);
28+
if (parsed) {
29+
if (parsed.message_type === "DATA") {
30+
if (parsed.data) {
31+
const normalizedData = normalizeResponseRowStreaming(
32+
parsed.data,
33+
executeQueryOptions,
34+
this.meta
35+
);
36+
for (const data of normalizedData) {
37+
this.emit("data", data);
38+
}
39+
}
40+
} else if (parsed.message_type === "START") {
41+
const metaDataJson = JSONbig.parse(line);
42+
this.meta = getNormalizedMeta(metaDataJson.result_columns);
43+
this.emit("meta", this.meta);
44+
} else if (parsed.message_type === "FINISH_SUCCESSFULLY") {
45+
this.push(null);
46+
} else if (parsed.message_type === "FINISH_WITH_ERROR") {
47+
this.destroy(new Error(`Result encountered an error: ${line}`));
48+
}
49+
} else {
50+
this.destroy(new Error(`Result row could not be parsed: ${line}`));
51+
}
52+
}
53+
} catch (err) {
54+
this.destroy(err);
55+
}
56+
};
57+
readLine.on("line", lineParser);
58+
59+
readLine.on("close", () => {
60+
this.push(null);
61+
});
62+
}
63+
64+
_read() {}
65+
}

src/types.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ export enum OutputFormat {
2626
// JSON_COMPACT_LIMITED = "FB_JSONCompactLimited",
2727
COMPACT = "JSON_Compact", // supported in v3 of packdb
2828
JSON_COMPACT = "JSONCompact", // to be as above, after ensure all clients has v3
29-
JSON = "JSON"
29+
JSON = "JSON",
30+
JSON_LINES = "JSONLines_Compact"
3031
}
3132

3233
export type QuerySettings = Record<

test/integration/v1/stream.test.ts

Lines changed: 19 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,43 @@
1-
import stream, { TransformCallback } from "stream";
2-
import { Firebolt } from "../../../src/index";
1+
import { Firebolt } from "../../../src";
32

43
const connectionParams = {
54
auth: {
6-
username: process.env.FIREBOLT_USERNAME as string,
7-
password: process.env.FIREBOLT_PASSWORD as string
5+
client_id: process.env.FIREBOLT_CLIENT_ID as string,
6+
client_secret: process.env.FIREBOLT_CLIENT_SECRET as string
87
},
8+
account: process.env.FIREBOLT_ACCOUNT as string,
99
database: process.env.FIREBOLT_DATABASE as string,
1010
engineName: process.env.FIREBOLT_ENGINE_NAME as string
1111
};
1212

13-
jest.setTimeout(20000);
13+
jest.setTimeout(250000);
1414

1515
describe("streams", () => {
16-
it("stream transformters", async () => {
17-
class SerializeRowStream extends stream.Transform {
18-
public constructor() {
19-
super({
20-
objectMode: true,
21-
transform(
22-
row: any,
23-
encoding: BufferEncoding,
24-
callback: TransformCallback
25-
) {
26-
const transformed = JSON.stringify(row);
27-
this.push(transformed);
28-
this.push("\n");
29-
callback();
30-
}
31-
});
32-
}
33-
}
34-
16+
it("stream transformers", async () => {
3517
const firebolt = Firebolt({
3618
apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string
3719
});
3820

39-
const serializedStream = new SerializeRowStream();
40-
4121
const connection = await firebolt.connect(connectionParams);
4222

43-
const statement = await connection.execute("select 1 union all select 2");
23+
const statement = await connection.executeStream(
24+
`select 1 from generate_series(1, 250000000)` //~1 GB response
25+
);
4426

4527
const { data } = await statement.streamResult();
46-
47-
data.pipe(serializedStream).pipe(process.stdout);
28+
let sum = 0;
29+
30+
data
31+
.on("meta", meta => {
32+
console.log("Meta:", meta);
33+
})
34+
.on("data", row => {
35+
sum += row[0];
36+
});
4837

4938
await new Promise(resolve => {
5039
data.on("end", () => {
40+
expect(sum).toEqual(250000000);
5141
resolve(null);
5242
});
5343
});

0 commit comments

Comments
 (0)