Skip to content

Commit 34352fb

Browse files
authored
Multistatement responses (#36)
1 parent ebcdab3 commit 34352fb

File tree

5 files changed

+345
-147
lines changed

5 files changed

+345
-147
lines changed

src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,7 @@ export {
3838
QueryFormatter
3939
} from "./formatter";
4040

41+
export { JSONParser } from "./statement/stream/parser";
42+
4143
export type { Connection } from "./connection";
4244
export type { Meta } from "./meta";

src/statement/index.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ export class Statement {
6969

7070
async streamResult(options?: StreamOptions) {
7171
const response = await this.request.ready();
72-
const jsonParser = new JSONStream({
72+
const jsonStream = new JSONStream({
7373
emitter: this.rowStream,
7474
options,
7575
executeQueryOptions: this.executeQueryOptions
@@ -125,26 +125,28 @@ export class Statement {
125125
.split("\n");
126126
try {
127127
for (const line of lines) {
128-
jsonParser.processLine(line);
128+
jsonStream.processLine(line);
129129
}
130130
} catch (error) {
131131
errorHandler(error);
132132
return;
133133
}
134134

135-
for (const row of jsonParser.rows) {
135+
const result = jsonStream.getResult(0);
136+
// for now only supports single statement sql
137+
for (const row of result.rows) {
136138
this.rowStream.push(row);
137139
}
138140

139-
jsonParser.rows = [];
141+
result.rows = [];
140142
str = rest;
141143
}
142144
});
143145

144146
response.body.on("end", () => {
145147
try {
146-
const rest = jsonParser.parseRest();
147-
const statistics = getNormalizedStatistics(rest);
148+
const result = jsonStream.getResult(0);
149+
const statistics = getNormalizedStatistics(result.statistics);
148150
this.rowStream.emit("statistics", statistics);
149151
this.rowStream.push(null);
150152
} catch (error) {

src/statement/stream/jsonStream.ts

Lines changed: 27 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,14 @@ import { Meta } from "../../meta";
44
import { normalizeColumn, normalizeRow } from "../normalizeResponse";
55
import { hydrateRow } from "../hydrateResponse";
66
import { RowStream } from "./rowStream";
7+
import { JSONParser } from "./parser";
78

89
export class JSONStream {
10+
jsonParser: JSONParser;
911
options?: StreamOptions;
1012
executeQueryOptions: ExecuteQueryOptions;
1113
emitter: RowStream;
1214
rowParser: RowParser;
13-
state:
14-
| "meta"
15-
| "meta-array"
16-
| "rootKeys"
17-
| "data"
18-
| "data-array"
19-
| "query"
20-
| "query-object"
21-
| null;
22-
23-
columns: Meta[];
24-
rows: unknown[];
25-
rest: string;
26-
27-
objBuffer?: string;
2815

2916
constructor({
3017
emitter,
@@ -35,151 +22,50 @@ export class JSONStream {
3522
options?: StreamOptions;
3623
executeQueryOptions: ExecuteQueryOptions;
3724
}) {
38-
this.state = null;
3925
this.emitter = emitter;
4026
this.options = options;
4127
this.executeQueryOptions = executeQueryOptions;
42-
4328
this.rowParser = this.options?.rowParser || this.defaultRowParser;
44-
this.columns = [];
45-
this.rows = [];
46-
this.rest = "{";
29+
30+
this.jsonParser = new JSONParser({
31+
onMetadataParsed: columns => {
32+
this.emitter.emit("metadata", columns);
33+
},
34+
hydrateRow: this.rowParser,
35+
hydrateColumn: (columnStr: string) => {
36+
const column = JSONbig.parse(columnStr);
37+
return normalizeColumn(column);
38+
}
39+
});
4740
}
4841

49-
defaultRowParser(row: string, isLastRow: boolean) {
42+
defaultRowParser = (row: string, isLastRow: boolean) => {
5043
const normalizeData = this.executeQueryOptions.response?.normalizeData;
5144
const parsed = JSONbig.parse(row);
5245
const hydrate = this.executeQueryOptions.response?.hydrateRow || hydrateRow;
53-
const hydratedRow = hydrate(parsed, this.columns, this.executeQueryOptions);
46+
const result = this.getResult(0);
47+
const columns = result.columns;
48+
const hydratedRow = hydrate(
49+
parsed,
50+
columns as Meta[],
51+
this.executeQueryOptions
52+
);
5453
if (normalizeData) {
5554
const normalizedRow = normalizeRow(
5655
hydratedRow,
57-
this.columns,
56+
columns as Meta[],
5857
this.executeQueryOptions
5958
);
6059
return normalizedRow;
6160
}
6261
return hydratedRow;
63-
}
64-
65-
parseRest() {
66-
const parsed = JSONbig.parse(this.rest);
67-
return parsed;
68-
}
69-
70-
handleRoot(line: string) {
71-
if (line === "{") {
72-
this.state = "rootKeys";
73-
}
74-
}
75-
76-
handleRootKeys(line: string) {
77-
if (line === "query") {
78-
this.state = "query";
79-
} else if (line === '"query": {') {
80-
this.state = "query-object";
81-
} else if (line === '"meta":') {
82-
this.state = "meta";
83-
} else if (line === '"data":') {
84-
this.state = "data";
85-
} else if (line === '"meta": [') {
86-
this.state = "meta-array";
87-
} else if (line === '"data": [') {
88-
this.state = "data-array";
89-
} else {
90-
this.rest += line;
91-
}
92-
}
93-
94-
handleMeta(line: string) {
95-
if (line === "[") {
96-
this.state = "meta-array";
97-
}
98-
}
99-
100-
handleMetaArray(line: string) {
101-
if (line.match(/^},?$/)) {
102-
const columnStr = this.objBuffer + "}";
103-
const column = JSONbig.parse(columnStr);
104-
const normalizedColumn = normalizeColumn(column);
105-
this.columns.push(normalizedColumn);
106-
this.objBuffer = undefined;
107-
} else if (line === "{") {
108-
this.objBuffer = line;
109-
} else if (line.match(/^],?$/)) {
110-
this.emitter.emit("metadata", this.columns);
111-
this.state = "rootKeys";
112-
} else {
113-
this.objBuffer += line;
114-
}
115-
}
116-
117-
handleDataArray(line: string) {
118-
if (line.match(/^[\]}],?$/) && this.objBuffer) {
119-
const rowStr = this.objBuffer + line[0];
120-
const row = this.rowParser(rowStr, false);
121-
this.rows.push(row);
122-
this.objBuffer = undefined;
123-
} else if (line === "{" || line === "[") {
124-
this.objBuffer = line;
125-
} else if (line.match(/^],?$/)) {
126-
this.state = "rootKeys";
127-
} else if (this.objBuffer === undefined) {
128-
const isLastRow = line[line.length - 1] !== ",";
129-
const rowStr = isLastRow ? line : line.substr(0, line.length - 1);
130-
const row = this.rowParser(rowStr, isLastRow);
131-
this.rows.push(row);
132-
} else {
133-
this.objBuffer += line;
134-
}
135-
}
136-
137-
handleData(line: string) {
138-
if (line === "[") {
139-
this.state = "data-array";
140-
}
141-
}
142-
143-
handleQuery(line: string) {
144-
if (line === "{") {
145-
this.state = "query-object";
146-
}
147-
}
148-
149-
handleQueryObject(line: string) {
150-
if (line.match(/^},?$/)) {
151-
const queryStr = this.objBuffer + "}";
152-
const query = JSONbig.parse(queryStr);
153-
this.objBuffer = undefined;
154-
this.state = "rootKeys";
155-
} else {
156-
this.objBuffer += line;
157-
}
158-
}
62+
};
15963

16064
processLine(line: string) {
161-
line = line.trim();
162-
163-
if (!line.length) {
164-
return;
165-
}
65+
this.jsonParser.processLine(line);
66+
}
16667

167-
if (this.state === null) {
168-
this.handleRoot(line);
169-
} else if (this.state === "rootKeys") {
170-
this.handleRootKeys(line);
171-
} else if (this.state === "meta") {
172-
this.handleMeta(line);
173-
} else if (this.state === "data") {
174-
this.handleData(line);
175-
} else if (this.state === "meta-array") {
176-
this.handleMetaArray(line);
177-
} else if (this.state === "data-array") {
178-
this.handleDataArray(line);
179-
} else if (this.state === "query") {
180-
this.handleQuery(line);
181-
} else if (this.state === "query-object") {
182-
this.handleQueryObject(line);
183-
}
68+
getResult(index: number) {
69+
return this.jsonParser.results[index];
18470
}
18571
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import { JSONParser } from "./parser";
2+
3+
const body1 = `
4+
{
5+
"query":
6+
{
7+
"query_id": "50cd4109-02de-4e19-b995-5e71a5f16fb7"
8+
},
9+
"meta":
10+
[
11+
{
12+
"name": "engine_name",
13+
"type": "text"
14+
}
15+
],
16+
17+
"data":
18+
[
19+
["peacekeeper_ns_2023_01_20_08_06_45_153_create_engine_test"],
20+
["Aymeric_test_2_Analytics"],
21+
["integration_testing_windowslatest_37_1675858870"],
22+
["test_4"],
23+
["peacekeeper_ns_2023_01_21_11_03_09_210_CREATE_ENGINE_TEST_2_3"]
24+
],
25+
26+
"rows": 5,
27+
28+
"rows_before_limit_at_least": 233,
29+
30+
"statistics":
31+
{
32+
"elapsed": 0.42408089,
33+
"rows_read": 233,
34+
"bytes_read": 30680,
35+
"time_before_execution": 0.000588018,
36+
"time_to_execute": 0.423289816,
37+
"scanned_bytes_cache": 0,
38+
"scanned_bytes_storage": 0
39+
}
40+
}
41+
`;
42+
const body2 = `
43+
{
44+
"query":
45+
{
46+
"query_id": "50cd4109-02de-4e19-b995-5e71a5f16fb9"
47+
},
48+
"meta":
49+
[
50+
{
51+
"name": "engine_name",
52+
"type": "text"
53+
}
54+
],
55+
56+
"data":
57+
[
58+
["peacekeeper_ns_2023_01_20_08_06_45_153_create_engine_test"],
59+
["Aymeric_test_2_Analytics"],
60+
["integration_testing_windowslatest_37_1675858870"]
61+
],
62+
63+
"rows": 3,
64+
65+
"rows_before_limit_at_least": 233,
66+
67+
"statistics":
68+
{
69+
"elapsed": 0.42408089,
70+
"rows_read": 233,
71+
"bytes_read": 30680,
72+
"time_before_execution": 0.000588018,
73+
"time_to_execute": 0.423289816,
74+
"scanned_bytes_cache": 0,
75+
"scanned_bytes_storage": 0
76+
}
77+
}
78+
`;
79+
describe("parser", () => {
80+
it("handles single reponse", () => {
81+
const parser = new JSONParser({});
82+
parser.processBody(body1);
83+
expect(parser.results[0].rows).toHaveLength(5);
84+
expect(parser.results[0].columns).toHaveLength(1);
85+
});
86+
it("handles multi response", () => {
87+
const parser = new JSONParser({});
88+
parser.processBody(`
89+
${body1}
90+
${body2}`);
91+
expect(parser.results[0].rows).toHaveLength(5);
92+
expect(parser.results[1].rows).toHaveLength(3);
93+
expect(parser.results[1].columns).toHaveLength(1);
94+
});
95+
});

0 commit comments

Comments
 (0)