Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"build": "rm -fr ./build && tsc -p tsconfig.lib.json",
"release": "standard-version",
"test": "jest",
"test:ci": "jest --ci --bail",
"test:ci": "node --expose-gc ./node_modules/.bin/jest --ci --bail",
"type-check": "tsc -p tsconfig.lib.json"
},
"prettier": {
Expand Down
14 changes: 8 additions & 6 deletions src/statement/normalizeResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,14 @@ export const normalizeResponseRowStreaming = (
const { response: { normalizeData = false } = {} } = executeQueryOptions;

const hydrate = executeQueryOptions?.response?.hydrateRow || hydrateRow;

return data.map((row: Row) => {
const hydratedRow = hydrate(row, meta, executeQueryOptions);
const result: Row[] = new Array(data.length);
for (let i = 0; i < data.length; i++) {
const hydratedRow = hydrate(data[i], meta, executeQueryOptions);
if (normalizeData) {
return normalizeRow(hydratedRow, meta, executeQueryOptions);
result[i] = normalizeRow(hydratedRow, meta, executeQueryOptions);
} else {
result[i] = hydratedRow;
}
return hydratedRow;
});
}
return result;
};
215 changes: 175 additions & 40 deletions src/statement/stream/serverSideStream.ts
Original file line number Diff line number Diff line change
@@ -1,76 +1,211 @@
import { Readable } from "stream";
import JSONbig from "json-bigint";
import readline from "readline";
import {
getNormalizedMeta,
normalizeResponseRowStreaming
} from "../normalizeResponse";
import { Response } from "node-fetch";
import { ExecuteQueryOptions } from "../../types";
import { ExecuteQueryOptions, Row } from "../../types";
import { Meta } from "../../meta";

export class ServerSideStream extends Readable {
private meta: Meta[] = [];
private readonly pendingRows: Row[] = [];
private finished = false;
private processingData = false;
private inputPaused = false;
private readonly bufferGrowthThreshold = 10; // Stop adding to buffer when over this many rows are already in
private lineBuffer = "";
private sourceStream: NodeJS.ReadableStream | null = null;

constructor(
private readonly response: Response,
private readonly executeQueryOptions: ExecuteQueryOptions
) {
super({ objectMode: true });
const readLine = readline.createInterface({
input: response.body,
crlfDelay: Infinity
this.setupInputStream();
}

private setupInputStream() {
this.sourceStream = this.response.body;

if (!this.sourceStream) {
this.destroy(new Error("Response body is null or undefined"));
return;
}

this.sourceStream.on("data", (chunk: Buffer) => {
this.handleData(chunk);
});

const lineParser = (line: string) => {
try {
if (line.trim()) {
const parsed = JSONbig.parse(line);
if (parsed) {
if (parsed.message_type === "DATA") {
this.processData(parsed);
} else if (parsed.message_type === "START") {
this.meta = getNormalizedMeta(parsed.result_columns);
this.emit("meta", this.meta);
} else if (parsed.message_type === "FINISH_SUCCESSFULLY") {
this.push(null);
} else if (parsed.message_type === "FINISH_WITH_ERRORS") {
this.destroy(
new Error(
`Result encountered an error: ${parsed.errors
.map((error: { description: string }) => error.description)
.join("\n")}`
)
);
}
} else {
this.destroy(new Error(`Result row could not be parsed: ${line}`));
this.sourceStream.on("end", () => {
this.handleInputEnd();
});

this.sourceStream.on("error", (err: Error) => {
this.destroy(err);
});
}

private handleData(chunk: Buffer) {
// Convert chunk to string and add to line buffer
this.lineBuffer += chunk.toString();

// Process complete lines
let lineStart = 0;
let lineEnd = this.lineBuffer.indexOf("\n", lineStart);

while (lineEnd !== -1) {
const line = this.lineBuffer.slice(lineStart, lineEnd);
this.processLine(line.trim());

lineStart = lineEnd + 1;
lineEnd = this.lineBuffer.indexOf("\n", lineStart);
}

// Keep remaining partial line in buffer
this.lineBuffer = this.lineBuffer.slice(lineStart);

// Apply backpressure if we have too many pending rows
if (
this.pendingRows.length > this.bufferGrowthThreshold &&
this.sourceStream &&
!this.inputPaused &&
!this.processingData
) {
this.sourceStream.pause();
this.inputPaused = true;
}
}

private handleInputEnd() {
// Process any remaining line in buffer
if (this.lineBuffer.trim()) {
this.processLine(this.lineBuffer.trim());
this.lineBuffer = "";
}

this.finished = true;
this.tryPushPendingData();
}

private processLine(line: string) {
if (!line) return;

try {
const parsed = JSONbig.parse(line);
if (parsed) {
if (parsed.message_type === "DATA") {
this.handleDataMessage(parsed);
} else if (parsed.message_type === "START") {
this.meta = getNormalizedMeta(parsed.result_columns);
this.emit("meta", this.meta);
} else if (parsed.message_type === "FINISH_SUCCESSFULLY") {
this.finished = true;
this.tryPushPendingData();
} else if (parsed.message_type === "FINISH_WITH_ERRORS") {
// Ensure source stream is resumed before destroying to prevent hanging
if (this.sourceStream && this.inputPaused) {
this.sourceStream.resume();
this.inputPaused = false;
}
this.destroy(
new Error(
`Result encountered an error: ${parsed.errors
.map((error: { description: string }) => error.description)
.join("\n")}`
)
);
}
} catch (err) {
this.destroy(err);
} else {
this.destroy(new Error(`Result row could not be parsed: ${line}`));
}
};
readLine.on("line", lineParser);

readLine.on("close", () => {
this.push(null);
});
} catch (err) {
this.destroy(err);
}
}

private processData(parsed: { data: any[] }) {
private handleDataMessage(parsed: { data: unknown[] }) {
if (parsed.data) {
// Process rows one by one to handle backpressure properly
const normalizedData = normalizeResponseRowStreaming(
parsed.data,
this.executeQueryOptions,
this.meta
);
for (const data of normalizedData) {
this.emit("data", data);

// Add to pending rows buffer
this.pendingRows.push(...normalizedData);

// Try to push data immediately if not already processing
if (!this.processingData) {
this.tryPushPendingData();
}
}
}

private tryPushPendingData() {
if (this.processingData || this.destroyed) {
return;
}

this.processingData = true;

while (this.pendingRows.length > 0) {
const row = this.pendingRows.shift();
const canContinue = this.push(row);

// If push returns false, stop pushing and wait for _read to be called
if (!canContinue) {
this.processingData = false;
return;
}
}

// If we've finished processing all data and the server indicated completion
if (this.finished && this.pendingRows.length === 0) {
this.push(null);
this.processingData = false;
return;
}

this.processingData = false;
}

_read() {
/* _read method requires implementation, even if data comes from other sources */
// Called when the stream is ready for more data
if (!this.processingData && this.pendingRows.length > 0) {
this.tryPushPendingData();
}

// Also resume source stream if it was paused and we have capacity
if (
this.sourceStream &&
this.inputPaused &&
this.pendingRows.length < this.bufferGrowthThreshold
) {
this.sourceStream.resume();
this.inputPaused = false;
}
}

_destroy(err: Error | null, callback: (error?: Error | null) => void) {
if (this.sourceStream) {
// Resume stream if paused to ensure proper cleanup
if (this.inputPaused) {
this.sourceStream.resume();
this.inputPaused = false;
}

// Only call destroy if it exists (for Node.js streams)
const destroyableStream = this.sourceStream as unknown as {
destroy?: () => void;
};
if (typeof destroyableStream.destroy === "function") {
destroyableStream.destroy();
}
this.sourceStream = null;
}
callback(err);
}
}
Loading