diff --git a/package.json b/package.json index 1c3e7155..197e7030 100644 --- a/package.json +++ b/package.json @@ -11,7 +11,7 @@ "build": "rm -fr ./build && tsc -p tsconfig.lib.json", "release": "standard-version", "test": "jest", - "test:ci": "jest --ci --bail", + "test:ci": "node --expose-gc ./node_modules/.bin/jest --ci --bail", "type-check": "tsc -p tsconfig.lib.json" }, "prettier": { diff --git a/src/statement/normalizeResponse.ts b/src/statement/normalizeResponse.ts index 2798819c..6ee02748 100644 --- a/src/statement/normalizeResponse.ts +++ b/src/statement/normalizeResponse.ts @@ -105,12 +105,14 @@ export const normalizeResponseRowStreaming = ( const { response: { normalizeData = false } = {} } = executeQueryOptions; const hydrate = executeQueryOptions?.response?.hydrateRow || hydrateRow; - - return data.map((row: Row) => { - const hydratedRow = hydrate(row, meta, executeQueryOptions); + const result: Row[] = new Array(data.length); + for (let i = 0; i < data.length; i++) { + const hydratedRow = hydrate(data[i], meta, executeQueryOptions); if (normalizeData) { - return normalizeRow(hydratedRow, meta, executeQueryOptions); + result[i] = normalizeRow(hydratedRow, meta, executeQueryOptions); + } else { + result[i] = hydratedRow; } - return hydratedRow; - }); + } + return result; }; diff --git a/src/statement/stream/serverSideStream.ts b/src/statement/stream/serverSideStream.ts index 18d7aa82..c51eacc6 100644 --- a/src/statement/stream/serverSideStream.ts +++ b/src/statement/stream/serverSideStream.ts @@ -1,76 +1,211 @@ import { Readable } from "stream"; import JSONbig from "json-bigint"; -import readline from "readline"; import { getNormalizedMeta, normalizeResponseRowStreaming } from "../normalizeResponse"; import { Response } from "node-fetch"; -import { ExecuteQueryOptions } from "../../types"; +import { ExecuteQueryOptions, Row } from "../../types"; import { Meta } from "../../meta"; export class ServerSideStream extends Readable { private meta: Meta[] = []; + private readonly pendingRows: Row[] = []; + private finished = false; + private processingData = false; + private inputPaused = false; + private readonly bufferGrowthThreshold = 10; // Stop adding to buffer when over this many rows are already in + private lineBuffer = ""; + private sourceStream: NodeJS.ReadableStream | null = null; + constructor( private readonly response: Response, private readonly executeQueryOptions: ExecuteQueryOptions ) { super({ objectMode: true }); - const readLine = readline.createInterface({ - input: response.body, - crlfDelay: Infinity + this.setupInputStream(); + } + + private setupInputStream() { + this.sourceStream = this.response.body; + + if (!this.sourceStream) { + this.destroy(new Error("Response body is null or undefined")); + return; + } + + this.sourceStream.on("data", (chunk: Buffer) => { + this.handleData(chunk); }); - const lineParser = (line: string) => { - try { - if (line.trim()) { - const parsed = JSONbig.parse(line); - if (parsed) { - if (parsed.message_type === "DATA") { - this.processData(parsed); - } else if (parsed.message_type === "START") { - this.meta = getNormalizedMeta(parsed.result_columns); - this.emit("meta", this.meta); - } else if (parsed.message_type === "FINISH_SUCCESSFULLY") { - this.push(null); - } else if (parsed.message_type === "FINISH_WITH_ERRORS") { - this.destroy( - new Error( - `Result encountered an error: ${parsed.errors - .map((error: { description: string }) => error.description) - .join("\n")}` - ) - ); - } - } else { - this.destroy(new Error(`Result row could not be parsed: ${line}`)); + this.sourceStream.on("end", () => { + this.handleInputEnd(); + }); + + this.sourceStream.on("error", (err: Error) => { + this.destroy(err); + }); + } + + private handleData(chunk: Buffer) { + // Convert chunk to string and add to line buffer + this.lineBuffer += chunk.toString(); + + // Process complete lines + let lineStart = 0; + let lineEnd = this.lineBuffer.indexOf("\n", lineStart); + + while (lineEnd !== -1) { + const line = this.lineBuffer.slice(lineStart, lineEnd); + this.processLine(line.trim()); + + lineStart = lineEnd + 1; + lineEnd = this.lineBuffer.indexOf("\n", lineStart); + } + + // Keep remaining partial line in buffer + this.lineBuffer = this.lineBuffer.slice(lineStart); + + // Apply backpressure if we have too many pending rows + if ( + this.pendingRows.length > this.bufferGrowthThreshold && + this.sourceStream && + !this.inputPaused && + !this.processingData + ) { + this.sourceStream.pause(); + this.inputPaused = true; + } + } + + private handleInputEnd() { + // Process any remaining line in buffer + if (this.lineBuffer.trim()) { + this.processLine(this.lineBuffer.trim()); + this.lineBuffer = ""; + } + + this.finished = true; + this.tryPushPendingData(); + } + + private processLine(line: string) { + if (!line) return; + + try { + const parsed = JSONbig.parse(line); + if (parsed) { + if (parsed.message_type === "DATA") { + this.handleDataMessage(parsed); + } else if (parsed.message_type === "START") { + this.meta = getNormalizedMeta(parsed.result_columns); + this.emit("meta", this.meta); + } else if (parsed.message_type === "FINISH_SUCCESSFULLY") { + this.finished = true; + this.tryPushPendingData(); + } else if (parsed.message_type === "FINISH_WITH_ERRORS") { + // Ensure source stream is resumed before destroying to prevent hanging + if (this.sourceStream && this.inputPaused) { + this.sourceStream.resume(); + this.inputPaused = false; } + this.destroy( + new Error( + `Result encountered an error: ${parsed.errors + .map((error: { description: string }) => error.description) + .join("\n")}` + ) + ); } - } catch (err) { - this.destroy(err); + } else { + this.destroy(new Error(`Result row could not be parsed: ${line}`)); } - }; - readLine.on("line", lineParser); - - readLine.on("close", () => { - this.push(null); - }); + } catch (err) { + this.destroy(err); + } } - private processData(parsed: { data: any[] }) { + private handleDataMessage(parsed: { data: unknown[] }) { if (parsed.data) { + // Process rows one by one to handle backpressure properly const normalizedData = normalizeResponseRowStreaming( parsed.data, this.executeQueryOptions, this.meta ); - for (const data of normalizedData) { - this.emit("data", data); + + // Add to pending rows buffer + this.pendingRows.push(...normalizedData); + + // Try to push data immediately if not already processing + if (!this.processingData) { + this.tryPushPendingData(); + } + } + } + + private tryPushPendingData() { + if (this.processingData || this.destroyed) { + return; + } + + this.processingData = true; + + while (this.pendingRows.length > 0) { + const row = this.pendingRows.shift(); + const canContinue = this.push(row); + + // If push returns false, stop pushing and wait for _read to be called + if (!canContinue) { + this.processingData = false; + return; } } + + // If we've finished processing all data and the server indicated completion + if (this.finished && this.pendingRows.length === 0) { + this.push(null); + this.processingData = false; + return; + } + + this.processingData = false; } _read() { - /* _read method requires implementation, even if data comes from other sources */ + // Called when the stream is ready for more data + if (!this.processingData && this.pendingRows.length > 0) { + this.tryPushPendingData(); + } + + // Also resume source stream if it was paused and we have capacity + if ( + this.sourceStream && + this.inputPaused && + this.pendingRows.length < this.bufferGrowthThreshold + ) { + this.sourceStream.resume(); + this.inputPaused = false; + } + } + + _destroy(err: Error | null, callback: (error?: Error | null) => void) { + if (this.sourceStream) { + // Resume stream if paused to ensure proper cleanup + if (this.inputPaused) { + this.sourceStream.resume(); + this.inputPaused = false; + } + + // Only call destroy if it exists (for Node.js streams) + const destroyableStream = this.sourceStream as unknown as { + destroy?: () => void; + }; + if (typeof destroyableStream.destroy === "function") { + destroyableStream.destroy(); + } + this.sourceStream = null; + } + callback(err); } } diff --git a/test/integration/v2/performance.test.ts b/test/integration/v2/performance.test.ts new file mode 100644 index 00000000..75fb6be0 --- /dev/null +++ b/test/integration/v2/performance.test.ts @@ -0,0 +1,329 @@ +import { Firebolt } from "../../../src/index"; +import * as stream from "stream"; + +const connectionParams = { + auth: { + client_id: process.env.FIREBOLT_CLIENT_ID as string, + client_secret: process.env.FIREBOLT_CLIENT_SECRET as string + }, + account: process.env.FIREBOLT_ACCOUNT as string, + database: process.env.FIREBOLT_DATABASE as string, + engineName: process.env.FIREBOLT_ENGINE_NAME as string +}; + +jest.setTimeout(250000); + +describe("performance comparison", () => { + const generateLargeResultQuery = (rows: number) => ` + SELECT + i as id, + 'user_' || i::string as username, + 'email_' || i::string || '@example.com' as email, + CASE WHEN i % 2 = 0 THEN true ELSE false END as status, + CAST('100000000000000000' as BIGINT) as big_number, + '2024-01-01'::date + (i % 365) as created_date, + RANDOM() * 1000 as score, + 'Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.' as description + FROM generate_series(1, ${rows}) as i + `; + + it("compare normal vs streaming execution performance", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + + const connection = await firebolt.connect(connectionParams); + const seriesNum = 100000; + const query = generateLargeResultQuery(seriesNum); + + console.log(`\nTesting performance with ${seriesNum} rows...`); + + // Test normal execution (fetchResult) + const normalStartTime = process.hrtime.bigint(); + + const statement = await connection.execute(query, { + response: { + normalizeData: true, + bigNumberAsString: false + } + }); + + const { data: normalData } = await statement.fetchResult(); + const normalEndTime = process.hrtime.bigint(); + + const normalExecutionTime = Number(normalEndTime - normalStartTime) / 1e6; // Convert to milliseconds + + console.log(`Normal execution: ${normalExecutionTime.toFixed(2)}ms`); + expect(normalData.length).toBe(seriesNum); + + // Test streaming execution (streamResult) + const streamStartTime = process.hrtime.bigint(); + + const streamStatement = await connection.executeStream(query, { + response: { + normalizeData: true, + bigNumberAsString: false + } + }); + + const { data: streamData } = await streamStatement.streamResult(); + + let streamRowCount = 0; + + // Process streaming data + const processStreamData = new Promise((resolve, reject) => { + streamData.on("data", () => { + streamRowCount++; + }); + + streamData.on("end", () => { + resolve(); + }); + + streamData.on("error", error => { + reject(error); + }); + }); + + await processStreamData; + const streamEndTime = process.hrtime.bigint(); + + const streamExecutionTime = Number(streamEndTime - streamStartTime) / 1e6; // Convert to milliseconds + + console.log(`Stream execution: ${streamExecutionTime.toFixed(2)}ms`); + expect(streamRowCount).toBe(seriesNum); + + // Performance analysis + const timeDifference = normalExecutionTime - streamExecutionTime; + + console.log(`\nPerformance Analysis:`); + console.log( + `Time difference: ${timeDifference.toFixed(2)}ms (${ + timeDifference > 0 ? "streaming faster" : "normal faster" + })` + ); + console.log( + `Speed ratio: ${(normalExecutionTime / streamExecutionTime).toFixed(2)}x` + ); + + // Verify both methods processed the same number of rows + expect(streamRowCount).toBe(normalData.length); // Same number of rows processed + + // Ensure streaming is not more than 10% slower than normal execution + const maxAllowedStreamTime = normalExecutionTime * 1.1; // 10% slower threshold + expect(streamExecutionTime).toBeLessThanOrEqual(maxAllowedStreamTime); + }); + + it("compare streaming vs normal with pipeline processing", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + + const connection = await firebolt.connect(connectionParams); + const seriesNum = 50000; // Smaller dataset for pipeline comparison + const query = generateLargeResultQuery(seriesNum); + + console.log(`\nTesting pipeline performance with ${seriesNum} rows...`); + + // Test normal execution with manual processing + const normalStartTime = process.hrtime.bigint(); + + const statement = await connection.execute(query, { + response: { + normalizeData: true, + bigNumberAsString: false + } + }); + + const { data: normalData } = await statement.fetchResult(); + + // Simulate processing (similar to what streaming pipeline would do) + let processedNormalCount = 0; + const processedNormalData: string[] = []; + for (const row of normalData) { + processedNormalCount++; + processedNormalData.push(JSON.stringify(row)); + } + + const normalEndTime = process.hrtime.bigint(); + + const normalExecutionTime = Number(normalEndTime - normalStartTime) / 1e6; + + console.log(`Normal with processing: ${normalExecutionTime.toFixed(2)}ms`); + + // Test streaming with pipeline processing + const streamStartTime = process.hrtime.bigint(); + + const streamStatement = await connection.executeStream(query, { + response: { + normalizeData: true, + bigNumberAsString: false + } + }); + + const { data: streamData } = await streamStatement.streamResult(); + + let processedStreamCount = 0; + + // Create processing pipeline + const jsonTransform = new stream.Transform({ + objectMode: true, + transform( + row: unknown, + encoding: BufferEncoding, + callback: (error?: Error | null) => void + ) { + try { + processedStreamCount++; + + const json = JSON.stringify(row); + this.push(json); + callback(); + } catch (err) { + callback(err as Error); + } + } + }); + + const processedStreamData: string[] = []; + const collectStream = new stream.Writable({ + objectMode: true, + write(chunk: string, encoding, callback) { + processedStreamData.push(chunk); + callback(); + } + }); + + // Use pipeline for proper backpressure handling + await stream.promises.pipeline(streamData, jsonTransform, collectStream); + + const streamEndTime = process.hrtime.bigint(); + + const streamExecutionTime = Number(streamEndTime - streamStartTime) / 1e6; + + console.log(`Stream with pipeline: ${streamExecutionTime.toFixed(2)}ms`); + + // Verify results + expect(processedStreamCount).toBe(seriesNum); + expect(processedNormalCount).toBe(seriesNum); + expect(processedStreamData.length).toBe(processedNormalData.length); + + // Performance analysis + const timeDifference = normalExecutionTime - streamExecutionTime; + + console.log(`\nPipeline Performance Analysis:`); + console.log( + `Time difference: ${timeDifference.toFixed(2)}ms (${ + timeDifference > 0 ? "streaming faster" : "normal faster" + })` + ); + console.log( + `Speed ratio: ${(normalExecutionTime / streamExecutionTime).toFixed(2)}x` + ); + console.log( + `Processing efficiency: ${( + processedStreamCount / processedNormalCount + ).toFixed(2)}x` + ); + + // Verify results + expect(processedStreamCount).toBe(seriesNum); + expect(processedNormalCount).toBe(seriesNum); + expect(processedStreamData.length).toBe(processedNormalData.length); + + // Ensure streaming pipeline is not more than 10% slower than normal execution + const maxAllowedStreamTime = normalExecutionTime * 1.1; // 10% slower threshold + expect(streamExecutionTime).toBeLessThanOrEqual(maxAllowedStreamTime); + }); + + it("execution time comparison with different dataset sizes", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + + const connection = await firebolt.connect(connectionParams); + const testSizes = [10000, 25000, 50000]; // Different dataset sizes + + const results: Array<{ + size: number; + normalTime: number; + streamTime: number; + }> = []; + + for (const size of testSizes) { + console.log(`\nTesting dataset size: ${size} rows`); + const query = generateLargeResultQuery(size); + + // Test normal execution + const normalStartTime = process.hrtime.bigint(); + + const statement = await connection.execute(query, { + response: { normalizeData: true, bigNumberAsString: false } + }); + + const { data: normalData } = await statement.fetchResult(); + const normalEndTime = process.hrtime.bigint(); + + const normalTime = Number(normalEndTime - normalStartTime) / 1e6; + + // Test streaming execution + const streamStartTime = process.hrtime.bigint(); + + const streamStatement = await connection.executeStream(query, { + response: { normalizeData: true, bigNumberAsString: false } + }); + + const { data: streamData } = await streamStatement.streamResult(); + + let streamRowCount = 0; + + await new Promise((resolve, reject) => { + streamData.on("data", () => { + streamRowCount++; + }); + + streamData.on("end", resolve); + streamData.on("error", reject); + }); + + const streamEndTime = process.hrtime.bigint(); + const streamTime = Number(streamEndTime - streamStartTime) / 1e6; + + results.push({ + size, + normalTime, + streamTime + }); + + console.log( + `Size ${size}: Normal(${normalTime.toFixed( + 2 + )}ms) vs Stream(${streamTime.toFixed(2)}ms)` + ); + + expect(normalData.length).toBe(size); + expect(streamRowCount).toBe(size); + } + + // Analysis across different sizes + console.log(`\nExecution Time Scaling Analysis:`); + for (const result of results) { + const timeRatio = result.streamTime / result.normalTime; + const timeDifference = result.normalTime - result.streamTime; + + console.log( + `Size ${result.size}: Time difference ${timeDifference.toFixed( + 2 + )}ms, Speed ratio ${timeRatio.toFixed(2)}x` + ); + + // Both methods should complete successfully + expect(result.normalTime).toBeGreaterThan(0); + expect(result.streamTime).toBeGreaterThan(0); + + // Ensure streaming is not more than 10% slower than normal execution for each dataset size + const maxAllowedStreamTime = result.normalTime * 1.1; // 10% slower threshold + expect(result.streamTime).toBeLessThanOrEqual(maxAllowedStreamTime); + } + }); +}); diff --git a/test/integration/v2/stream-data-types.test.ts b/test/integration/v2/stream-data-types.test.ts new file mode 100644 index 00000000..d4e4b343 --- /dev/null +++ b/test/integration/v2/stream-data-types.test.ts @@ -0,0 +1,286 @@ +import { Firebolt } from "../../../src"; +import stream from "node:stream"; +import fs from "node:fs"; +import path from "node:path"; +import os from "node:os"; +import BigNumber from "bignumber.js"; + +const connectionParams = { + auth: { + client_id: process.env.FIREBOLT_CLIENT_ID as string, + client_secret: process.env.FIREBOLT_CLIENT_SECRET as string + }, + account: process.env.FIREBOLT_ACCOUNT as string, + database: process.env.FIREBOLT_DATABASE as string, + engineName: process.env.FIREBOLT_ENGINE_NAME as string +}; + +jest.setTimeout(350000); + +describe("advanced stream tests", () => { + it("stream with different data types and memory management", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + + // Generate a query with various data types + const seriesNum = 100000; + const generateLargeResultQuery = (rows: number) => ` + SELECT + i as id, + 'user_' || i::string as username, + 'email_' || i::string || '@example.com' as email, + CASE WHEN i % 2 = 0 THEN true ELSE false END as status, + CAST('100000000000000000' as BIGINT) as big_number, + '2024-01-01'::date + (i % 365) as created_date, + RANDOM() * 1000 as score, + 'Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.' as description + FROM generate_series(1, ${rows}) as i + `; + + const statement = await connection.executeStream( + generateLargeResultQuery(seriesNum), + { + response: { + normalizeData: true, + bigNumberAsString: false + } + } + ); + + const { data } = await statement.streamResult(); + + // Add meta event handler to verify column metadata + data.on("meta", m => { + expect(m).toEqual([ + { name: "id", type: "int" }, + { name: "username", type: "text" }, + { name: "email", type: "text" }, + { name: "status", type: "boolean" }, + { name: "big_number", type: "long" }, + { name: "created_date", type: "date" }, + { name: "score", type: "double" }, + { name: "description", type: "text" } + ]); + }); + + // Buffer pool configuration + const poolSize = 8192; // 8KB + const poolBuffer = Buffer.allocUnsafe(poolSize); + const newlineCode = 0x0a; // '\n' character code + + // Track memory usage + const initialMemory = process.memoryUsage(); + let maxMemoryUsed = initialMemory.heapUsed; + let rowCount = 0; + let idSum = 0; // Track sum of id column for data integrity verification + + // Create a JSON transform stream with minimal allocation + const jsonTransform = new stream.Transform({ + objectMode: true, + highWaterMark: 1, // Limit buffering - critical for memory + transform( + row: unknown, + encoding: BufferEncoding, + callback: (error?: Error | null) => void + ) { + try { + rowCount++; + + // Add the id to our sum for data integrity verification + const typedRow = row as Record; + const id = typedRow.id as number; + idSum += id; + + if (rowCount % 5000 === 0) { + const currentMemory = process.memoryUsage(); + maxMemoryUsed = Math.max(maxMemoryUsed, currentMemory.heapUsed); + if (global.gc) { + console.log(`Invoking GC at row ${rowCount}`); + global.gc(); + } + } + + // Verify data types are correct for normalized data on first row + if (rowCount === 1) { + const typedRow = row as Record; + + // Verify actual values for first row + expect(typedRow.id).toBe(1); + expect(typedRow.username).toBe("user_1"); + expect(typedRow.email).toBe("email_1@example.com"); + expect(typedRow.status).toBe(false); // i=1, 1%2=1, so false + expect(typedRow.big_number).toEqual( + new BigNumber("100000000000000000") + ); + expect(typedRow.created_date).toEqual(new Date("2024-01-02")); // 2024-01-01 + 1 day + expect(typedRow.description).toBe( + "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua." + ); + } + + // Verify data types are correct for normalized data on last row + if (rowCount === seriesNum) { + const typedRow = row as Record; + + // Verify actual values for last row + expect(typedRow.id).toBe(seriesNum); + expect(typedRow.username).toBe(`user_${seriesNum}`); + expect(typedRow.email).toBe(`email_${seriesNum}@example.com`); + expect(typedRow.status).toBe(true); // seriesNum=100000, 100000%2=0, so true + expect(typedRow.big_number).toEqual( + new BigNumber("100000000000000000") + ); + expect(typedRow.created_date).toEqual(new Date("2024-12-21")); // 2024-01-01 + (100000 % 365) = 2024-01-01 + 269 days + expect(typedRow.description).toBe( + "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua." + ); + } + + const json = JSON.stringify(row); + const jsonLen = Buffer.byteLength(json); + const totalLen = jsonLen + 1; + + let buffer: Buffer; + if (totalLen <= poolSize) { + // Use pool for small rows - no allocation + poolBuffer.write(json, 0, jsonLen); + poolBuffer[jsonLen] = newlineCode; + buffer = poolBuffer.subarray(0, totalLen); + } else { + // Allocate for large rows + buffer = Buffer.allocUnsafe(totalLen); + buffer.write(json, 0, jsonLen); + buffer[jsonLen] = newlineCode; + } + + this.push(buffer); + callback(); + } catch (err) { + callback(err as Error); + } + } + }); + + // Create a batch processing stream that writes to a temporary file + let processedChunks = 0; + const batchSize = 100; + let batchBuffer: string[] = []; + + // Create a temporary file for writing + const tempDir = os.tmpdir(); + const tempFilePath = path.join( + tempDir, + `firebolt-stream-test-${Date.now()}.jsonl` + ); + const fileWriteStream = fs.createWriteStream(tempFilePath, { flags: "w" }); + + const outputStream = new stream.Writable({ + highWaterMark: 1, // Reasonable buffer size + write(chunk: Buffer, encoding, callback) { + processedChunks++; + const chunkStr = chunk.toString(); + batchBuffer.push(chunkStr); + + // Process in batches to create natural backpressure patterns + if (batchBuffer.length >= batchSize) { + // Process the batch synchronously (simulate some work) + const batchData = batchBuffer.join(""); + const lines = batchData.split("\n").filter(line => line.trim()); + + // Write valid JSON lines to the file + for (const line of lines) { + try { + JSON.parse(line); // Verify it's valid JSON + fileWriteStream.write(line + "\n"); + } catch (e) { + // Skip invalid JSON lines + } + } + + // Clear the batch + batchBuffer = []; + } + + callback(); + }, + + final(callback) { + // Process any remaining items in the final batch + if (batchBuffer.length > 0) { + const batchData = batchBuffer.join(""); + const lines = batchData.split("\n").filter(line => line.trim()); + + // Write remaining valid JSON lines to the file + for (const line of lines) { + try { + JSON.parse(line); // Verify it's valid JSON + fileWriteStream.write(line + "\n"); + } catch (e) { + // Skip invalid JSON lines + } + } + batchBuffer = []; + } + + // Close the file stream + fileWriteStream.end(() => { + callback(); + }); + } + }); + + // Use pipeline for proper backpressure handling + await stream.promises.pipeline(data, jsonTransform, outputStream); + + // Verify everything worked correctly + expect(rowCount).toBe(seriesNum); + expect(processedChunks).toBeGreaterThan(0); + + // Verify data integrity: sum of 1 to N should be N*(N+1)/2 + const expectedSum = (seriesNum * (seriesNum + 1)) / 2; + expect(idSum).toBe(expectedSum); + + // Verify the file was created and has content + expect(fs.existsSync(tempFilePath)).toBe(true); + const fileStats = fs.statSync(tempFilePath); + expect(fileStats.size).toBeGreaterThan(0); + + // Read a few lines from the file to verify JSON format + const fileContent = fs.readFileSync(tempFilePath, "utf-8"); + const lines = fileContent.split("\n").filter(line => line.trim()); + expect(lines.length).toBeGreaterThan(0); + + // Verify first line is valid JSON + if (lines.length > 0) { + const firstRow = JSON.parse(lines[0]); + expect(firstRow).toHaveProperty("id"); + expect(firstRow).toHaveProperty("username"); + } + + // Clean up the temporary file + fs.unlinkSync(tempFilePath); + + // Memory usage should remain reasonable with proper streaming + const memoryGrowth = + (maxMemoryUsed - initialMemory.heapUsed) / (1024 * 1024); + + if (global.gc) { + expect(memoryGrowth).toBeLessThan(30); + } else { + expect(memoryGrowth).toBeLessThan(100); + } + + console.log( + `Data types streaming test: processed ${rowCount} rows with various data types, ` + + `memory growth: ${memoryGrowth.toFixed( + 2 + )} MB, processed chunks: ${processedChunks}, file size: ${( + fileStats.size / + 1024 / + 1024 + ).toFixed(2)} MB` + ); + }); +}); diff --git a/test/integration/v2/stream.test.ts b/test/integration/v2/stream.test.ts index 4ab00ab7..095c0237 100644 --- a/test/integration/v2/stream.test.ts +++ b/test/integration/v2/stream.test.ts @@ -1,3 +1,4 @@ +import exp from "node:constants"; import { Firebolt } from "../../../src"; import stream, { TransformCallback } from "node:stream"; @@ -11,7 +12,7 @@ const connectionParams = { engineName: process.env.FIREBOLT_ENGINE_NAME as string }; -jest.setTimeout(250000); +jest.setTimeout(350000); describe("streams", () => { it("check sum from stream result", async () => { @@ -81,7 +82,7 @@ describe("streams", () => { super({ objectMode: true, transform( - row: any, + row: unknown, encoding: BufferEncoding, callback: TransformCallback ) { @@ -146,6 +147,12 @@ describe("streams", () => { } ]); }); + + // Consume data to prevent backpressure from blocking error messages + data.on("data", () => { + // Just consume the data, don't need to do anything with it + }); + const [error] = await stream.once(data, "error"); expect(error.message).toEqual( "Result encountered an error: Line 1, Column 9: Division by zero\n" + @@ -153,4 +160,107 @@ describe("streams", () => { " ^" ); }); + it("stream backpressure and memory management", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + + // Test with a moderate size dataset to verify proper streaming behavior + const statement = await connection.executeStream( + "select i as id from generate_series(1, 1000000) as i" + ); + + const { data } = await statement.streamResult(); + + // Track memory usage and verify proper data streaming + const initialMemory = process.memoryUsage(); + let maxMemoryUsed = initialMemory.heapUsed; + let rowCount = 0; + let idSum = 0; + + // Process data with simple event handlers (like existing tests) + data.on("data", row => { + rowCount++; + idSum += row[0]; // Sum the id values for data integrity + + // Track memory usage periodically + if (rowCount % 50 === 0) { + const currentMemory = process.memoryUsage(); + maxMemoryUsed = Math.max(maxMemoryUsed, currentMemory.heapUsed); + } + }); + + // Wait for stream completion + await new Promise((resolve, reject) => { + data.on("end", resolve); + data.on("error", reject); + }); + + // Verify the data was processed correctly + expect(rowCount).toBe(1000000); + + // Verify data integrity with sum check + const expectedSum = (1000000 * 1000001) / 2; // Sum of 1 to 1,000,000 + expect(idSum).toBe(expectedSum); + + // Memory usage should remain reasonable with proper streaming + const memoryGrowth = + (maxMemoryUsed - initialMemory.heapUsed) / (1024 * 1024); + + // Memory growth should be minimal for this size dataset + expect(memoryGrowth).toBeLessThan(20); + + console.log( + `Streaming test: processed ${rowCount} rows, ` + + `memory growth: ${memoryGrowth.toFixed(2)} MB` + ); + }); + + it("stream with pipeline and backpressure", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + + // Test with transform streams and pipeline for better backpressure handling + const statement = await connection.executeStream( + "select i as id, 'test_data_' || i::string as data from generate_series(1, 1000000) as i" + ); + + const { data } = await statement.streamResult(); + + let processedCount = 0; + + // Create a simple counting transform + const countingTransform = new stream.Transform({ + objectMode: true, + transform(chunk, encoding, callback) { + processedCount++; + // Pass through the data + this.push(chunk); + callback(); + } + }); + + // Create a collecting writable + const rows: unknown[] = []; + const collectStream = new stream.Writable({ + objectMode: true, + write(chunk, encoding, callback) { + rows.push(chunk); + callback(); + } + }); + + // Use pipeline for proper backpressure handling + await stream.promises.pipeline(data, countingTransform, collectStream); + + // Verify everything worked correctly + expect(processedCount).toBe(1000000); + expect(rows.length).toBe(1000000); + expect(rows[0]).toEqual([1, "test_data_1"]); + + console.log(`Pipeline test: processed ${processedCount} rows successfully`); + }); }); diff --git a/test/unit/v2/serverSideStream.test.ts b/test/unit/v2/serverSideStream.test.ts new file mode 100644 index 00000000..91da0897 --- /dev/null +++ b/test/unit/v2/serverSideStream.test.ts @@ -0,0 +1,438 @@ +import { PassThrough } from "stream"; +import { Response } from "node-fetch"; +import { ServerSideStream } from "../../../src/statement/stream/serverSideStream"; +import { ExecuteQueryOptions } from "../../../src/types"; + +describe("ServerSideStream", () => { + let mockResponse: Partial; + let executeQueryOptions: ExecuteQueryOptions; + + beforeEach(() => { + mockResponse = {}; + executeQueryOptions = {}; + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + describe("backpressure and pause/resume functionality", () => { + it("should pause source stream when pending rows exceed bufferGrowthThreshold", done => { + const sourceStream = new PassThrough(); + mockResponse.body = sourceStream; + + let pauseCalled = false; + + // Mock the pause method + sourceStream.pause = jest.fn(() => { + pauseCalled = true; + return sourceStream; + }); + + const serverSideStream = new ServerSideStream( + mockResponse as never, + executeQueryOptions + ); + + serverSideStream.on("error", done); + + // Send START message first + sourceStream.write( + JSON.stringify({ + message_type: "START", + result_columns: [{ name: "id", type: "integer" }] + }) + "\n" + ); + + // Wait for start message to be processed, then setup backpressure + setTimeout(() => { + // Mock push to always return false to create backpressure + const originalPush = serverSideStream.push.bind(serverSideStream); + serverSideStream.push = jest.fn(chunk => { + if (chunk !== null) { + return false; // Always return false for data chunks + } + return originalPush(chunk); // Allow null (end) to pass through + }); + + // Send a large single data message with many rows to ensure we exceed threshold + const largeDataMessage = + JSON.stringify({ + message_type: "DATA", + data: Array.from({ length: 15 }, (_, i) => [i]) // 15 rows, exceeds threshold of 10 + }) + "\n"; + + sourceStream.write(largeDataMessage); + + // Check after a delay + setTimeout(() => { + expect(pauseCalled).toBe(true); + done(); + }, 50); + }, 10); + }); + + it("should resume source stream when pending rows drop below threshold", done => { + const sourceStream = new PassThrough(); + mockResponse.body = sourceStream; + + const serverSideStream = new ServerSideStream( + mockResponse as never, + executeQueryOptions + ); + + let pauseCalled = false; + let resumeCalled = false; + + // Set up mocks AFTER ServerSideStream is created to avoid initialization issues + sourceStream.pause = jest.fn(() => { + pauseCalled = true; + return sourceStream; + }); + + sourceStream.resume = jest.fn(() => { + resumeCalled = true; + return sourceStream; + }); + + serverSideStream.on("error", done); + + // Send START message first + sourceStream.write( + JSON.stringify({ + message_type: "START", + result_columns: [{ name: "id", type: "integer" }] + }) + "\n" + ); + + setTimeout(() => { + // Mock push to always return false to accumulate rows and trigger pause + const originalPush = serverSideStream.push.bind(serverSideStream); + serverSideStream.push = jest.fn(chunk => { + if (chunk !== null) { + return false; // Always return false for data chunks to accumulate + } + return originalPush(chunk); + }); + + // Send large data message to exceed threshold and trigger pause + const largeDataMessage = + JSON.stringify({ + message_type: "DATA", + data: Array.from({ length: 15 }, (_, i) => [i]) + }) + "\n"; + + sourceStream.write(largeDataMessage); + + setTimeout(() => { + expect(pauseCalled).toBe(true); + + // Now test resume: change push to return true (allow processing) + serverSideStream.push = jest.fn(chunk => { + if (chunk !== null) { + return true; // Allow processing to drain buffer + } + return originalPush(chunk); + }); + + // Call _read to trigger the resume condition check + serverSideStream._read(); + + setTimeout(() => { + expect(resumeCalled).toBe(true); + done(); + }, 50); + }, 50); + }, 10); + }); + + it("should properly clean up on stream destruction", done => { + const sourceStream = new PassThrough(); + mockResponse.body = sourceStream; + + let resumeCalled = false; + + // Mock resume to track cleanup + sourceStream.resume = jest.fn(() => { + resumeCalled = true; + return sourceStream; + }); + + // Mock destroy method + sourceStream.destroy = jest.fn(); + + const serverSideStream = new ServerSideStream( + mockResponse as never, + executeQueryOptions + ); + + // First pause the stream by creating backpressure + serverSideStream.push = jest.fn(() => { + return false; // Always return false to simulate backpressure + }); + + // Send start message + sourceStream.write( + JSON.stringify({ + message_type: "START", + result_columns: [{ name: "id", type: "integer" }] + }) + "\n" + ); + + // Send data to trigger pause + // Send messages one at a time to trigger backpressure check between messages + for (let i = 0; i < 6; i++) { + const dataMessage = + JSON.stringify({ + message_type: "DATA", + data: [[i * 2 + 0], [i * 2 + 1]] + }) + "\n"; + sourceStream.write(dataMessage); + } + + setTimeout(() => { + // Destroy the stream + serverSideStream.destroy(); + + // Check that cleanup happened + expect(resumeCalled).toBe(true); + expect(sourceStream.destroy).toHaveBeenCalled(); + done(); + }, 50); + }); + + it("should handle error messages and cleanup properly", done => { + const sourceStream = new PassThrough(); + mockResponse.body = sourceStream; + + const serverSideStream = new ServerSideStream( + mockResponse as never, + executeQueryOptions + ); + + serverSideStream.on("error", (error: Error) => { + // Check that error handling resumed the stream for cleanup + expect(error.message).toContain("Result encountered an error"); + done(); + }); + + serverSideStream.on("data", () => { + // No-op to consume data + }); + + // Send start message + sourceStream.write( + JSON.stringify({ + message_type: "START", + result_columns: [{ name: "id", type: "integer" }] + }) + "\n" + ); + + // Send data one message at a time to trigger pause + for (let i = 0; i < 12; i++) { + const dataMessage = + JSON.stringify({ + message_type: "DATA", + data: [[i]] + }) + "\n"; + sourceStream.write(dataMessage); + } + + // Send error message + setTimeout(() => { + sourceStream.write( + JSON.stringify({ + message_type: "FINISH_WITH_ERRORS", + errors: [{ description: "Test error" }] + }) + "\n" + ); + }, 500); + }); + }); + + describe("basic functionality", () => { + it("should emit meta event on START message", done => { + const sourceStream = new PassThrough(); + mockResponse.body = sourceStream; + + const serverSideStream = new ServerSideStream( + mockResponse as never, + executeQueryOptions + ); + + let metaReceived = false; + + serverSideStream.on("meta", (meta: unknown) => { + metaReceived = true; + expect(meta).toBeDefined(); + expect(Array.isArray(meta)).toBe(true); + }); + + serverSideStream.on("error", done); + + // Add data event handler to consume the stream + serverSideStream.on("data", () => { + // No data expected, but this ensures stream consumption + }); + + // Send START message + sourceStream.write( + JSON.stringify({ + message_type: "START", + result_columns: [{ name: "id", type: "integer" }] + }) + "\n" + ); + + // Send FINISH message to complete + sourceStream.write( + JSON.stringify({ + message_type: "FINISH_SUCCESSFULLY" + }) + "\n" + ); + + sourceStream.end(); + + serverSideStream.on("end", () => { + expect(metaReceived).toBe(true); + done(); + }); + }); + + it("should emit data events for DATA messages", done => { + const sourceStream = new PassThrough(); + mockResponse.body = sourceStream; + + const serverSideStream = new ServerSideStream( + mockResponse as never, + executeQueryOptions + ); + + const dataEvents: unknown[] = []; + + serverSideStream.on("data", (data: unknown) => { + dataEvents.push(data); + }); + + serverSideStream.on("error", done); + + // Send START message + sourceStream.write( + JSON.stringify({ + message_type: "START", + result_columns: [{ name: "id", type: "integer" }] + }) + "\n" + ); + + // Send DATA messages + sourceStream.write( + JSON.stringify({ + message_type: "DATA", + data: [[1], [2], [3]] + }) + "\n" + ); + + // Send FINISH message + sourceStream.write( + JSON.stringify({ + message_type: "FINISH_SUCCESSFULLY" + }) + "\n" + ); + + sourceStream.end(); + + serverSideStream.on("end", () => { + expect(dataEvents).toHaveLength(3); + done(); + }); + }); + + it("should handle null response body", done => { + mockResponse.body = undefined; + + const serverSideStream = new ServerSideStream( + mockResponse as never, + executeQueryOptions + ); + + serverSideStream.on("error", (error: Error) => { + expect(error.message).toBe("Response body is null or undefined"); + done(); + }); + }); + + it("should handle malformed JSON gracefully", done => { + const sourceStream = new PassThrough(); + mockResponse.body = sourceStream; + + const serverSideStream = new ServerSideStream( + mockResponse as never, + executeQueryOptions + ); + + serverSideStream.on("error", (error: Error) => { + expect(error).toBeDefined(); + done(); + }); + + // Send malformed JSON + sourceStream.write("{ invalid json }\n"); + }); + + it("should handle partial lines in buffer correctly", done => { + const sourceStream = new PassThrough(); + mockResponse.body = sourceStream; + + const serverSideStream = new ServerSideStream( + mockResponse as never, + executeQueryOptions + ); + + let metaReceived = false; + let dataReceived = false; + + serverSideStream.on("meta", () => { + metaReceived = true; + }); + + serverSideStream.on("data", () => { + dataReceived = true; + }); + + serverSideStream.on("end", () => { + expect(metaReceived).toBe(true); + expect(dataReceived).toBe(true); + done(); + }); + + serverSideStream.on("error", done); + + // Send data in chunks that split JSON lines + const startMessage = + JSON.stringify({ + message_type: "START", + result_columns: [{ name: "id", type: "integer" }] + }) + "\n"; + + const dataMessage = + JSON.stringify({ + message_type: "DATA", + data: [[1]] + }) + "\n"; + + const finishMessage = + JSON.stringify({ + message_type: "FINISH_SUCCESSFULLY" + }) + "\n"; + + // Split messages across multiple chunks + const fullMessage = startMessage + dataMessage + finishMessage; + const midPoint = Math.floor(fullMessage.length / 2); + + sourceStream.write(fullMessage.slice(0, midPoint)); + setTimeout(() => { + sourceStream.write(fullMessage.slice(midPoint)); + sourceStream.end(); + }, 10); + }); + }); +});