From e2418bc48a5e4cf34cc1aa8386f023f333ee4a68 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Tue, 4 Nov 2025 16:35:00 +0000 Subject: [PATCH 01/11] fix memory leak --- src/statement/normalizeResponse.ts | 14 +-- src/statement/stream/serverSideStream.ts | 129 ++++++++++++++++++++--- test/integration/v2/stream.test.ts | 97 +++++++++++++++++ 3 files changed, 222 insertions(+), 18 deletions(-) diff --git a/src/statement/normalizeResponse.ts b/src/statement/normalizeResponse.ts index 2798819c..6ee02748 100644 --- a/src/statement/normalizeResponse.ts +++ b/src/statement/normalizeResponse.ts @@ -105,12 +105,14 @@ export const normalizeResponseRowStreaming = ( const { response: { normalizeData = false } = {} } = executeQueryOptions; const hydrate = executeQueryOptions?.response?.hydrateRow || hydrateRow; - - return data.map((row: Row) => { - const hydratedRow = hydrate(row, meta, executeQueryOptions); + const result: Row[] = new Array(data.length); + for (let i = 0; i < data.length; i++) { + const hydratedRow = hydrate(data[i], meta, executeQueryOptions); if (normalizeData) { - return normalizeRow(hydratedRow, meta, executeQueryOptions); + result[i] = normalizeRow(hydratedRow, meta, executeQueryOptions); + } else { + result[i] = hydratedRow; } - return hydratedRow; - }); + } + return result; }; diff --git a/src/statement/stream/serverSideStream.ts b/src/statement/stream/serverSideStream.ts index 18d7aa82..21eb309c 100644 --- a/src/statement/stream/serverSideStream.ts +++ b/src/statement/stream/serverSideStream.ts @@ -6,18 +6,29 @@ import { normalizeResponseRowStreaming } from "../normalizeResponse"; import { Response } from "node-fetch"; -import { ExecuteQueryOptions } from "../../types"; +import { ExecuteQueryOptions, Row } from "../../types"; import { Meta } from "../../meta"; export class ServerSideStream extends Readable { private meta: Meta[] = []; + private readlineInterface: readline.Interface | null = null; + private pendingRows: Row[] = []; + private finished = false; + private processingData = false; + private readlineInterfacePaused = false; + private readonly maxPendingRows = 10; // Limit pending rows to prevent memory buildup + constructor( private readonly response: Response, private readonly executeQueryOptions: ExecuteQueryOptions ) { super({ objectMode: true }); - const readLine = readline.createInterface({ - input: response.body, + this.setupReadline(); + } + + private setupReadline() { + this.readlineInterface = readline.createInterface({ + input: this.response.body, crlfDelay: Infinity }); @@ -27,13 +38,19 @@ export class ServerSideStream extends Readable { const parsed = JSONbig.parse(line); if (parsed) { if (parsed.message_type === "DATA") { - this.processData(parsed); + this.handleDataMessage(parsed); } else if (parsed.message_type === "START") { this.meta = getNormalizedMeta(parsed.result_columns); this.emit("meta", this.meta); } else if (parsed.message_type === "FINISH_SUCCESSFULLY") { - this.push(null); + this.finished = true; + this.tryPushPendingData(); } else if (parsed.message_type === "FINISH_WITH_ERRORS") { + // Ensure readline interface is resumed before destroying to prevent hanging + if (this.readlineInterface && this.readlineInterfacePaused) { + this.readlineInterface.resume(); + this.readlineInterfacePaused = false; + } this.destroy( new Error( `Result encountered an error: ${parsed.errors @@ -50,27 +67,115 @@ export class ServerSideStream extends Readable { this.destroy(err); } }; - readLine.on("line", lineParser); - readLine.on("close", () => { - this.push(null); + this.readlineInterface.on("line", lineParser); + + this.readlineInterface.on("close", () => { + this.finished = true; + this.tryPushPendingData(); + }); + + this.readlineInterface.on("error", err => { + this.destroy(err); }); } - private processData(parsed: { data: any[] }) { + private handleDataMessage(parsed: { data: unknown[] }) { if (parsed.data) { + // Process rows one by one to handle backpressure properly const normalizedData = normalizeResponseRowStreaming( parsed.data, this.executeQueryOptions, this.meta ); - for (const data of normalizedData) { - this.emit("data", data); + + // Add to pending rows buffer + this.pendingRows.push(...normalizedData); + + // If we have too many pending rows, pause the readline interface to apply backpressure + // Only pause if we're not already processing and have significantly exceeded the limit + if ( + this.pendingRows.length > this.maxPendingRows && + this.readlineInterface && + !this.readlineInterfacePaused && + !this.processingData + ) { + this.readlineInterface.pause(); + this.readlineInterfacePaused = true; + } + + // Try to push data immediately if not already processing + if (!this.processingData) { + this.tryPushPendingData(); + } + } + } + + private tryPushPendingData() { + if (this.processingData || this.destroyed) { + return; + } + + this.processingData = true; + + while (this.pendingRows.length > 0) { + const row = this.pendingRows.shift(); + const canContinue = this.push(row); + + // If pending rows dropped below threshold, resume the readline interface + if ( + this.pendingRows.length <= this.maxPendingRows / 4 && + this.readlineInterface && + this.readlineInterfacePaused + ) { + this.readlineInterface.resume(); + this.readlineInterfacePaused = false; + } + + // If push returns false, stop pushing and wait for _read to be called + if (!canContinue) { + this.processingData = false; + return; } } + + // If we've finished processing all data and the server indicated completion + if (this.finished && this.pendingRows.length === 0) { + this.push(null); + this.processingData = false; + return; + } + + this.processingData = false; } _read() { - /* _read method requires implementation, even if data comes from other sources */ + // Called when the stream is ready for more data + if (!this.processingData && this.pendingRows.length > 0) { + this.tryPushPendingData(); + } + + // Also resume readline interface if it was paused and we have capacity + if ( + this.readlineInterface && + this.readlineInterfacePaused && + this.pendingRows.length < this.maxPendingRows / 2 + ) { + this.readlineInterface.resume(); + this.readlineInterfacePaused = false; + } + } + + _destroy(err: Error | null, callback: (error?: Error | null) => void) { + if (this.readlineInterface) { + // Resume interface if paused to ensure proper cleanup + if (this.readlineInterfacePaused) { + this.readlineInterface.resume(); + this.readlineInterfacePaused = false; + } + this.readlineInterface.close(); + this.readlineInterface = null; + } + callback(err); } } diff --git a/test/integration/v2/stream.test.ts b/test/integration/v2/stream.test.ts index 4ab00ab7..eb6db8a0 100644 --- a/test/integration/v2/stream.test.ts +++ b/test/integration/v2/stream.test.ts @@ -153,4 +153,101 @@ describe("streams", () => { " ^" ); }); + it("stream backpressure and memory management", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + + // Test with a moderate size dataset to verify proper streaming behavior + const statement = await connection.executeStream( + "select i as id from generate_series(1, 1000000) as i" + ); + + const { data } = await statement.streamResult(); + + // Track memory usage and verify proper data streaming + const initialMemory = process.memoryUsage(); + let maxMemoryUsed = initialMemory.heapUsed; + let rowCount = 0; + + // Process data with simple event handlers (like existing tests) + data.on("data", () => { + rowCount++; + + // Track memory usage periodically + if (rowCount % 50 === 0) { + const currentMemory = process.memoryUsage(); + maxMemoryUsed = Math.max(maxMemoryUsed, currentMemory.heapUsed); + } + }); + + // Wait for stream completion + await new Promise((resolve, reject) => { + data.on("end", resolve); + data.on("error", reject); + }); + + // Verify the data was processed correctly + expect(rowCount).toBe(1000000); + + // Memory usage should remain reasonable with proper streaming + const memoryGrowth = + (maxMemoryUsed - initialMemory.heapUsed) / (1024 * 1024); + + // Memory growth should be minimal for this size dataset + expect(memoryGrowth).toBeLessThan(20); + + console.log( + `Streaming test: processed ${rowCount} rows, ` + + `memory growth: ${memoryGrowth.toFixed(2)} MB` + ); + }); + + it("stream with pipeline and backpressure", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + + // Test with transform streams and pipeline for better backpressure handling + const statement = await connection.executeStream( + "select i as id, 'test_data_' || i::string as data from generate_series(1, 1000000) as i" + ); + + const { data } = await statement.streamResult(); + + let processedCount = 0; + + // Create a simple counting transform + const countingTransform = new stream.Transform({ + objectMode: true, + transform(chunk, encoding, callback) { + processedCount++; + // Pass through the data + this.push(chunk); + callback(); + } + }); + + // Create a collecting writable + const rows: unknown[] = []; + const collectStream = new stream.Writable({ + objectMode: true, + write(chunk, encoding, callback) { + rows.push(chunk); + callback(); + } + }); + + // Use pipeline for proper backpressure handling + await stream.promises.pipeline(data, countingTransform, collectStream); + + // Verify everything worked correctly + expect(processedCount).toBe(1000000); + expect(rows.length).toBe(1000000); + expect(rows[0]).toEqual([1, "test_data_1"]); + + console.log(`Pipeline test: processed ${processedCount} rows successfully`); + }); }); From a21575a33a30ea1e33ad3de28cee085555600821 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Tue, 4 Nov 2025 16:54:06 +0000 Subject: [PATCH 02/11] add extended test --- test/integration/v2/stream.test.ts | 138 ++++++++++++++++++++++++++++- 1 file changed, 137 insertions(+), 1 deletion(-) diff --git a/test/integration/v2/stream.test.ts b/test/integration/v2/stream.test.ts index eb6db8a0..a91026c5 100644 --- a/test/integration/v2/stream.test.ts +++ b/test/integration/v2/stream.test.ts @@ -1,3 +1,4 @@ +import exp from "node:constants"; import { Firebolt } from "../../../src"; import stream, { TransformCallback } from "node:stream"; @@ -81,7 +82,7 @@ describe("streams", () => { super({ objectMode: true, transform( - row: any, + row: unknown, encoding: BufferEncoding, callback: TransformCallback ) { @@ -250,4 +251,139 @@ describe("streams", () => { console.log(`Pipeline test: processed ${processedCount} rows successfully`); }); + it("stream with different data types and memory management", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + + // Generate a query with various data types + const seriesNum = 100000; + const generateLargeResultQuery = (rows: number) => ` + SELECT + i as id, + 'user_' || i::string as username, + 'email_' || i::string || '@example.com' as email, + CASE WHEN i % 2 = 0 THEN 'active' ELSE 'inactive' END as status, + CAST('100000000000000000' as BIGINT) as big_number, + '2024-01-01'::date + (i % 365) as created_date, + RANDOM() * 1000 as score, + 'Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.' as description + FROM generate_series(1, ${rows}) as i + `; + + const statement = await connection.executeStream( + generateLargeResultQuery(seriesNum), + { + response: { + normalizeData: true, + bigNumberAsString: false + } + } + ); + + const { data } = await statement.streamResult(); + + // Buffer pool configuration + const poolSize = 8192; // 8KB + const poolBuffer = Buffer.allocUnsafe(poolSize); + const newlineCode = 0x0a; // '\n' character code + + // Track memory usage + const initialMemory = process.memoryUsage(); + let maxMemoryUsed = initialMemory.heapUsed; + let rowCount = 0; + + // Create a JSON transform stream with minimal allocation + const jsonTransform = new stream.Transform({ + objectMode: true, + highWaterMark: 1, // Limit buffering - critical for memory + transform( + row: unknown, + encoding: BufferEncoding, + callback: (error?: Error | null) => void + ) { + try { + rowCount++; + + if (rowCount % 5000 === 0) { + const currentMemory = process.memoryUsage(); + maxMemoryUsed = Math.max(maxMemoryUsed, currentMemory.heapUsed); + } + + // Verify data types are correct for normalized data + if (rowCount === 1) { + const typedRow = row as Record; + expect(typeof typedRow.id).toBe("number"); + expect(typeof typedRow.username).toBe("string"); + expect(typeof typedRow.email).toBe("string"); + expect(typeof typedRow.status).toBe("string"); + expect(typeof typedRow.big_number).toBe("number"); + expect(typedRow.created_date instanceof Date).toBe(true); + expect(typeof typedRow.score).toBe("number"); + expect(typeof typedRow.description).toBe("string"); + } + + const json = JSON.stringify(row); + const jsonLen = Buffer.byteLength(json); + const totalLen = jsonLen + 1; + + let buffer: Buffer; + if (totalLen <= poolSize) { + // Use pool for small rows - no allocation + poolBuffer.write(json, 0, jsonLen); + poolBuffer[jsonLen] = newlineCode; + buffer = poolBuffer.subarray(0, totalLen); + } else { + // Allocate for large rows + buffer = Buffer.allocUnsafe(totalLen); + buffer.write(json, 0, jsonLen); + buffer[jsonLen] = newlineCode; + } + + this.push(buffer); + callback(); + } catch (err) { + callback(err as Error); + } + } + }); + + // Create a moderate backpressure stream + let processedChunks = 0; + const outputStream = new stream.Transform({ + highWaterMark: 1, + transform(chunk, encoding, callback) { + processedChunks++; + + // Simulate occasional slow processing with minimal delays + if (processedChunks % 1000 === 0) { + setTimeout(() => { + callback(); + }, 1); // 1ms delay + } else { + callback(); + } + } + }); + + // Use pipeline for proper backpressure handling + await stream.promises.pipeline(data, jsonTransform, outputStream); + + // Verify everything worked correctly + expect(rowCount).toBe(seriesNum); + expect(processedChunks).toBeGreaterThan(0); + + // Memory usage should remain reasonable with proper streaming + const memoryGrowth = + (maxMemoryUsed - initialMemory.heapUsed) / (1024 * 1024); + expect(memoryGrowth).toBeLessThan(100); // Allow reasonable memory for complex data types with various field types + + console.log( + `Data types streaming test: processed ${rowCount} rows with various data types, ` + + `memory growth: ${memoryGrowth.toFixed( + 2 + )} MB, processed chunks: ${processedChunks}` + ); + }); }); From db406259c4645aaa79fd6eba0a950694fe29e380 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Wed, 5 Nov 2025 16:44:59 +0000 Subject: [PATCH 03/11] fix test --- test/integration/v2/stream.test.ts | 60 ++++++++++++++++++++++++------ 1 file changed, 49 insertions(+), 11 deletions(-) diff --git a/test/integration/v2/stream.test.ts b/test/integration/v2/stream.test.ts index a91026c5..56637f21 100644 --- a/test/integration/v2/stream.test.ts +++ b/test/integration/v2/stream.test.ts @@ -1,6 +1,7 @@ import exp from "node:constants"; import { Firebolt } from "../../../src"; import stream, { TransformCallback } from "node:stream"; +import BigNumber from "bignumber.js"; const connectionParams = { auth: { @@ -264,7 +265,7 @@ describe("streams", () => { i as id, 'user_' || i::string as username, 'email_' || i::string || '@example.com' as email, - CASE WHEN i % 2 = 0 THEN 'active' ELSE 'inactive' END as status, + CASE WHEN i % 2 = 0 THEN true ELSE false END as status, CAST('100000000000000000' as BIGINT) as big_number, '2024-01-01'::date + (i % 365) as created_date, RANDOM() * 1000 as score, @@ -284,6 +285,20 @@ describe("streams", () => { const { data } = await statement.streamResult(); + // Add meta event handler to verify column metadata + data.on("meta", m => { + expect(m).toEqual([ + { name: "id", type: "int" }, + { name: "username", type: "text" }, + { name: "email", type: "text" }, + { name: "status", type: "boolean" }, + { name: "big_number", type: "long" }, + { name: "created_date", type: "date" }, + { name: "score", type: "double" }, + { name: "description", type: "text" } + ]); + }); + // Buffer pool configuration const poolSize = 8192; // 8KB const poolBuffer = Buffer.allocUnsafe(poolSize); @@ -311,17 +326,40 @@ describe("streams", () => { maxMemoryUsed = Math.max(maxMemoryUsed, currentMemory.heapUsed); } - // Verify data types are correct for normalized data + // Verify data types are correct for normalized data on first row if (rowCount === 1) { const typedRow = row as Record; - expect(typeof typedRow.id).toBe("number"); - expect(typeof typedRow.username).toBe("string"); - expect(typeof typedRow.email).toBe("string"); - expect(typeof typedRow.status).toBe("string"); - expect(typeof typedRow.big_number).toBe("number"); - expect(typedRow.created_date instanceof Date).toBe(true); - expect(typeof typedRow.score).toBe("number"); - expect(typeof typedRow.description).toBe("string"); + + // Verify actual values for first row + expect(typedRow.id).toBe(1); + expect(typedRow.username).toBe("user_1"); + expect(typedRow.email).toBe("email_1@example.com"); + expect(typedRow.status).toBe(false); // i=1, 1%2=1, so false + expect(typedRow.big_number).toEqual( + new BigNumber("100000000000000000") + ); + expect(typedRow.created_date).toEqual(new Date("2024-01-02")); // 2024-01-01 + 1 day + expect(typedRow.description).toBe( + "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua." + ); + } + + // Verify data types are correct for normalized data on last row + if (rowCount === seriesNum) { + const typedRow = row as Record; + + // Verify actual values for last row + expect(typedRow.id).toBe(seriesNum); + expect(typedRow.username).toBe(`user_${seriesNum}`); + expect(typedRow.email).toBe(`email_${seriesNum}@example.com`); + expect(typedRow.status).toBe(true); // seriesNum=100000, 100000%2=0, so true + expect(typedRow.big_number).toEqual( + new BigNumber("100000000000000000") + ); + expect(typedRow.created_date).toEqual(new Date("2024-12-21")); // 2024-01-01 + (100000 % 365) = 2024-01-01 + 269 days + expect(typedRow.description).toBe( + "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua." + ); } const json = JSON.stringify(row); @@ -377,7 +415,7 @@ describe("streams", () => { // Memory usage should remain reasonable with proper streaming const memoryGrowth = (maxMemoryUsed - initialMemory.heapUsed) / (1024 * 1024); - expect(memoryGrowth).toBeLessThan(100); // Allow reasonable memory for complex data types with various field types + expect(memoryGrowth).toBeLessThan(120); // Allow reasonable memory for complex data types with various field types console.log( `Data types streaming test: processed ${rowCount} rows with various data types, ` + From 9173f559ce0015d46a96a518ab91b9a0b8f60da4 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Wed, 5 Nov 2025 17:48:12 +0000 Subject: [PATCH 04/11] verify no performance degradation --- src/statement/stream/serverSideStream.ts | 2 +- test/integration/v2/performance.test.ts | 329 +++++++++++++++++++++++ 2 files changed, 330 insertions(+), 1 deletion(-) create mode 100644 test/integration/v2/performance.test.ts diff --git a/src/statement/stream/serverSideStream.ts b/src/statement/stream/serverSideStream.ts index 21eb309c..9eb5c133 100644 --- a/src/statement/stream/serverSideStream.ts +++ b/src/statement/stream/serverSideStream.ts @@ -16,7 +16,7 @@ export class ServerSideStream extends Readable { private finished = false; private processingData = false; private readlineInterfacePaused = false; - private readonly maxPendingRows = 10; // Limit pending rows to prevent memory buildup + private readonly maxPendingRows = 5; // Limit pending rows to prevent memory buildup constructor( private readonly response: Response, diff --git a/test/integration/v2/performance.test.ts b/test/integration/v2/performance.test.ts new file mode 100644 index 00000000..75fb6be0 --- /dev/null +++ b/test/integration/v2/performance.test.ts @@ -0,0 +1,329 @@ +import { Firebolt } from "../../../src/index"; +import * as stream from "stream"; + +const connectionParams = { + auth: { + client_id: process.env.FIREBOLT_CLIENT_ID as string, + client_secret: process.env.FIREBOLT_CLIENT_SECRET as string + }, + account: process.env.FIREBOLT_ACCOUNT as string, + database: process.env.FIREBOLT_DATABASE as string, + engineName: process.env.FIREBOLT_ENGINE_NAME as string +}; + +jest.setTimeout(250000); + +describe("performance comparison", () => { + const generateLargeResultQuery = (rows: number) => ` + SELECT + i as id, + 'user_' || i::string as username, + 'email_' || i::string || '@example.com' as email, + CASE WHEN i % 2 = 0 THEN true ELSE false END as status, + CAST('100000000000000000' as BIGINT) as big_number, + '2024-01-01'::date + (i % 365) as created_date, + RANDOM() * 1000 as score, + 'Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.' as description + FROM generate_series(1, ${rows}) as i + `; + + it("compare normal vs streaming execution performance", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + + const connection = await firebolt.connect(connectionParams); + const seriesNum = 100000; + const query = generateLargeResultQuery(seriesNum); + + console.log(`\nTesting performance with ${seriesNum} rows...`); + + // Test normal execution (fetchResult) + const normalStartTime = process.hrtime.bigint(); + + const statement = await connection.execute(query, { + response: { + normalizeData: true, + bigNumberAsString: false + } + }); + + const { data: normalData } = await statement.fetchResult(); + const normalEndTime = process.hrtime.bigint(); + + const normalExecutionTime = Number(normalEndTime - normalStartTime) / 1e6; // Convert to milliseconds + + console.log(`Normal execution: ${normalExecutionTime.toFixed(2)}ms`); + expect(normalData.length).toBe(seriesNum); + + // Test streaming execution (streamResult) + const streamStartTime = process.hrtime.bigint(); + + const streamStatement = await connection.executeStream(query, { + response: { + normalizeData: true, + bigNumberAsString: false + } + }); + + const { data: streamData } = await streamStatement.streamResult(); + + let streamRowCount = 0; + + // Process streaming data + const processStreamData = new Promise((resolve, reject) => { + streamData.on("data", () => { + streamRowCount++; + }); + + streamData.on("end", () => { + resolve(); + }); + + streamData.on("error", error => { + reject(error); + }); + }); + + await processStreamData; + const streamEndTime = process.hrtime.bigint(); + + const streamExecutionTime = Number(streamEndTime - streamStartTime) / 1e6; // Convert to milliseconds + + console.log(`Stream execution: ${streamExecutionTime.toFixed(2)}ms`); + expect(streamRowCount).toBe(seriesNum); + + // Performance analysis + const timeDifference = normalExecutionTime - streamExecutionTime; + + console.log(`\nPerformance Analysis:`); + console.log( + `Time difference: ${timeDifference.toFixed(2)}ms (${ + timeDifference > 0 ? "streaming faster" : "normal faster" + })` + ); + console.log( + `Speed ratio: ${(normalExecutionTime / streamExecutionTime).toFixed(2)}x` + ); + + // Verify both methods processed the same number of rows + expect(streamRowCount).toBe(normalData.length); // Same number of rows processed + + // Ensure streaming is not more than 10% slower than normal execution + const maxAllowedStreamTime = normalExecutionTime * 1.1; // 10% slower threshold + expect(streamExecutionTime).toBeLessThanOrEqual(maxAllowedStreamTime); + }); + + it("compare streaming vs normal with pipeline processing", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + + const connection = await firebolt.connect(connectionParams); + const seriesNum = 50000; // Smaller dataset for pipeline comparison + const query = generateLargeResultQuery(seriesNum); + + console.log(`\nTesting pipeline performance with ${seriesNum} rows...`); + + // Test normal execution with manual processing + const normalStartTime = process.hrtime.bigint(); + + const statement = await connection.execute(query, { + response: { + normalizeData: true, + bigNumberAsString: false + } + }); + + const { data: normalData } = await statement.fetchResult(); + + // Simulate processing (similar to what streaming pipeline would do) + let processedNormalCount = 0; + const processedNormalData: string[] = []; + for (const row of normalData) { + processedNormalCount++; + processedNormalData.push(JSON.stringify(row)); + } + + const normalEndTime = process.hrtime.bigint(); + + const normalExecutionTime = Number(normalEndTime - normalStartTime) / 1e6; + + console.log(`Normal with processing: ${normalExecutionTime.toFixed(2)}ms`); + + // Test streaming with pipeline processing + const streamStartTime = process.hrtime.bigint(); + + const streamStatement = await connection.executeStream(query, { + response: { + normalizeData: true, + bigNumberAsString: false + } + }); + + const { data: streamData } = await streamStatement.streamResult(); + + let processedStreamCount = 0; + + // Create processing pipeline + const jsonTransform = new stream.Transform({ + objectMode: true, + transform( + row: unknown, + encoding: BufferEncoding, + callback: (error?: Error | null) => void + ) { + try { + processedStreamCount++; + + const json = JSON.stringify(row); + this.push(json); + callback(); + } catch (err) { + callback(err as Error); + } + } + }); + + const processedStreamData: string[] = []; + const collectStream = new stream.Writable({ + objectMode: true, + write(chunk: string, encoding, callback) { + processedStreamData.push(chunk); + callback(); + } + }); + + // Use pipeline for proper backpressure handling + await stream.promises.pipeline(streamData, jsonTransform, collectStream); + + const streamEndTime = process.hrtime.bigint(); + + const streamExecutionTime = Number(streamEndTime - streamStartTime) / 1e6; + + console.log(`Stream with pipeline: ${streamExecutionTime.toFixed(2)}ms`); + + // Verify results + expect(processedStreamCount).toBe(seriesNum); + expect(processedNormalCount).toBe(seriesNum); + expect(processedStreamData.length).toBe(processedNormalData.length); + + // Performance analysis + const timeDifference = normalExecutionTime - streamExecutionTime; + + console.log(`\nPipeline Performance Analysis:`); + console.log( + `Time difference: ${timeDifference.toFixed(2)}ms (${ + timeDifference > 0 ? "streaming faster" : "normal faster" + })` + ); + console.log( + `Speed ratio: ${(normalExecutionTime / streamExecutionTime).toFixed(2)}x` + ); + console.log( + `Processing efficiency: ${( + processedStreamCount / processedNormalCount + ).toFixed(2)}x` + ); + + // Verify results + expect(processedStreamCount).toBe(seriesNum); + expect(processedNormalCount).toBe(seriesNum); + expect(processedStreamData.length).toBe(processedNormalData.length); + + // Ensure streaming pipeline is not more than 10% slower than normal execution + const maxAllowedStreamTime = normalExecutionTime * 1.1; // 10% slower threshold + expect(streamExecutionTime).toBeLessThanOrEqual(maxAllowedStreamTime); + }); + + it("execution time comparison with different dataset sizes", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + + const connection = await firebolt.connect(connectionParams); + const testSizes = [10000, 25000, 50000]; // Different dataset sizes + + const results: Array<{ + size: number; + normalTime: number; + streamTime: number; + }> = []; + + for (const size of testSizes) { + console.log(`\nTesting dataset size: ${size} rows`); + const query = generateLargeResultQuery(size); + + // Test normal execution + const normalStartTime = process.hrtime.bigint(); + + const statement = await connection.execute(query, { + response: { normalizeData: true, bigNumberAsString: false } + }); + + const { data: normalData } = await statement.fetchResult(); + const normalEndTime = process.hrtime.bigint(); + + const normalTime = Number(normalEndTime - normalStartTime) / 1e6; + + // Test streaming execution + const streamStartTime = process.hrtime.bigint(); + + const streamStatement = await connection.executeStream(query, { + response: { normalizeData: true, bigNumberAsString: false } + }); + + const { data: streamData } = await streamStatement.streamResult(); + + let streamRowCount = 0; + + await new Promise((resolve, reject) => { + streamData.on("data", () => { + streamRowCount++; + }); + + streamData.on("end", resolve); + streamData.on("error", reject); + }); + + const streamEndTime = process.hrtime.bigint(); + const streamTime = Number(streamEndTime - streamStartTime) / 1e6; + + results.push({ + size, + normalTime, + streamTime + }); + + console.log( + `Size ${size}: Normal(${normalTime.toFixed( + 2 + )}ms) vs Stream(${streamTime.toFixed(2)}ms)` + ); + + expect(normalData.length).toBe(size); + expect(streamRowCount).toBe(size); + } + + // Analysis across different sizes + console.log(`\nExecution Time Scaling Analysis:`); + for (const result of results) { + const timeRatio = result.streamTime / result.normalTime; + const timeDifference = result.normalTime - result.streamTime; + + console.log( + `Size ${result.size}: Time difference ${timeDifference.toFixed( + 2 + )}ms, Speed ratio ${timeRatio.toFixed(2)}x` + ); + + // Both methods should complete successfully + expect(result.normalTime).toBeGreaterThan(0); + expect(result.streamTime).toBeGreaterThan(0); + + // Ensure streaming is not more than 10% slower than normal execution for each dataset size + const maxAllowedStreamTime = result.normalTime * 1.1; // 10% slower threshold + expect(result.streamTime).toBeLessThanOrEqual(maxAllowedStreamTime); + } + }); +}); From cb498af89d1854f16d6a6ab91d8e5d63695213ce Mon Sep 17 00:00:00 2001 From: ptiurin Date: Fri, 7 Nov 2025 14:43:42 +0000 Subject: [PATCH 05/11] fix test --- test/integration/v2/stream.test.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/integration/v2/stream.test.ts b/test/integration/v2/stream.test.ts index 56637f21..cc101373 100644 --- a/test/integration/v2/stream.test.ts +++ b/test/integration/v2/stream.test.ts @@ -148,6 +148,12 @@ describe("streams", () => { } ]); }); + + // Consume data to prevent backpressure from blocking error messages + data.on("data", () => { + // Just consume the data, don't need to do anything with it + }); + const [error] = await stream.once(data, "error"); expect(error.message).toEqual( "Result encountered an error: Line 1, Column 9: Division by zero\n" + From 430956782b244442b63ebe9f8d77a949cad3a8ed Mon Sep 17 00:00:00 2001 From: ptiurin Date: Fri, 7 Nov 2025 15:01:53 +0000 Subject: [PATCH 06/11] bump timeout a little --- test/integration/v2/stream.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/v2/stream.test.ts b/test/integration/v2/stream.test.ts index cc101373..b25eac87 100644 --- a/test/integration/v2/stream.test.ts +++ b/test/integration/v2/stream.test.ts @@ -13,7 +13,7 @@ const connectionParams = { engineName: process.env.FIREBOLT_ENGINE_NAME as string }; -jest.setTimeout(250000); +jest.setTimeout(350000); describe("streams", () => { it("check sum from stream result", async () => { From f5df9a2b714f99da636fbd9dfcdc900620b619bf Mon Sep 17 00:00:00 2001 From: ptiurin Date: Fri, 7 Nov 2025 17:58:45 +0000 Subject: [PATCH 07/11] use different output --- test/integration/v2/stream.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/integration/v2/stream.test.ts b/test/integration/v2/stream.test.ts index b25eac87..37476b5a 100644 --- a/test/integration/v2/stream.test.ts +++ b/test/integration/v2/stream.test.ts @@ -395,9 +395,9 @@ describe("streams", () => { // Create a moderate backpressure stream let processedChunks = 0; - const outputStream = new stream.Transform({ + const outputStream = new stream.Writable({ highWaterMark: 1, - transform(chunk, encoding, callback) { + write(chunk, encoding, callback) { processedChunks++; // Simulate occasional slow processing with minimal delays From 91b3e62d8adf3a9c212ab03284cc5fbf007af634 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Mon, 10 Nov 2025 09:33:46 +0000 Subject: [PATCH 08/11] refactor out readline --- package.json | 2 +- src/statement/stream/serverSideStream.ts | 198 ++++---- test/integration/v2/stream-data-types.test.ts | 276 +++++++++++ test/integration/v2/stream.test.ts | 173 ------- test/unit/v2/serverSideStream.test.ts | 430 ++++++++++++++++++ 5 files changed, 826 insertions(+), 253 deletions(-) create mode 100644 test/integration/v2/stream-data-types.test.ts create mode 100644 test/unit/v2/serverSideStream.test.ts diff --git a/package.json b/package.json index 1c3e7155..197e7030 100644 --- a/package.json +++ b/package.json @@ -11,7 +11,7 @@ "build": "rm -fr ./build && tsc -p tsconfig.lib.json", "release": "standard-version", "test": "jest", - "test:ci": "jest --ci --bail", + "test:ci": "node --expose-gc ./node_modules/.bin/jest --ci --bail", "type-check": "tsc -p tsconfig.lib.json" }, "prettier": { diff --git a/src/statement/stream/serverSideStream.ts b/src/statement/stream/serverSideStream.ts index 9eb5c133..3cd4a0b1 100644 --- a/src/statement/stream/serverSideStream.ts +++ b/src/statement/stream/serverSideStream.ts @@ -1,6 +1,5 @@ import { Readable } from "stream"; import JSONbig from "json-bigint"; -import readline from "readline"; import { getNormalizedMeta, normalizeResponseRowStreaming @@ -11,75 +10,121 @@ import { Meta } from "../../meta"; export class ServerSideStream extends Readable { private meta: Meta[] = []; - private readlineInterface: readline.Interface | null = null; - private pendingRows: Row[] = []; + private readonly pendingRows: Row[] = []; private finished = false; private processingData = false; - private readlineInterfacePaused = false; + private inputPaused = false; private readonly maxPendingRows = 5; // Limit pending rows to prevent memory buildup + private lineBuffer = ""; + private sourceStream: NodeJS.ReadableStream | null = null; constructor( private readonly response: Response, private readonly executeQueryOptions: ExecuteQueryOptions ) { super({ objectMode: true }); - this.setupReadline(); + this.setupInputStream(); } - private setupReadline() { - this.readlineInterface = readline.createInterface({ - input: this.response.body, - crlfDelay: Infinity - }); + private setupInputStream() { + this.sourceStream = this.response.body; - const lineParser = (line: string) => { - try { - if (line.trim()) { - const parsed = JSONbig.parse(line); - if (parsed) { - if (parsed.message_type === "DATA") { - this.handleDataMessage(parsed); - } else if (parsed.message_type === "START") { - this.meta = getNormalizedMeta(parsed.result_columns); - this.emit("meta", this.meta); - } else if (parsed.message_type === "FINISH_SUCCESSFULLY") { - this.finished = true; - this.tryPushPendingData(); - } else if (parsed.message_type === "FINISH_WITH_ERRORS") { - // Ensure readline interface is resumed before destroying to prevent hanging - if (this.readlineInterface && this.readlineInterfacePaused) { - this.readlineInterface.resume(); - this.readlineInterfacePaused = false; - } - this.destroy( - new Error( - `Result encountered an error: ${parsed.errors - .map((error: { description: string }) => error.description) - .join("\n")}` - ) - ); - } - } else { - this.destroy(new Error(`Result row could not be parsed: ${line}`)); - } - } - } catch (err) { - this.destroy(err); - } - }; + if (!this.sourceStream) { + this.destroy(new Error("Response body is null or undefined")); + return; + } - this.readlineInterface.on("line", lineParser); + this.sourceStream.on("data", (chunk: Buffer) => { + this.handleData(chunk); + }); - this.readlineInterface.on("close", () => { - this.finished = true; - this.tryPushPendingData(); + this.sourceStream.on("end", () => { + this.handleInputEnd(); }); - this.readlineInterface.on("error", err => { + this.sourceStream.on("error", (err: Error) => { this.destroy(err); }); } + private handleData(chunk: Buffer) { + // Convert chunk to string and add to line buffer + this.lineBuffer += chunk.toString(); + + // Process complete lines + let lineStart = 0; + let lineEnd = this.lineBuffer.indexOf("\n", lineStart); + + while (lineEnd !== -1) { + const line = this.lineBuffer.slice(lineStart, lineEnd); + this.processLine(line.trim()); + + lineStart = lineEnd + 1; + lineEnd = this.lineBuffer.indexOf("\n", lineStart); + } + + // Keep remaining partial line in buffer + this.lineBuffer = this.lineBuffer.slice(lineStart); + + // Apply backpressure if we have too many pending rows + if ( + this.pendingRows.length > this.maxPendingRows && + this.sourceStream && + !this.inputPaused && + !this.processingData + ) { + this.sourceStream.pause(); + this.inputPaused = true; + } + } + + private handleInputEnd() { + // Process any remaining line in buffer + if (this.lineBuffer.trim()) { + this.processLine(this.lineBuffer.trim()); + this.lineBuffer = ""; + } + + this.finished = true; + this.tryPushPendingData(); + } + + private processLine(line: string) { + if (!line) return; + + try { + const parsed = JSONbig.parse(line); + if (parsed) { + if (parsed.message_type === "DATA") { + this.handleDataMessage(parsed); + } else if (parsed.message_type === "START") { + this.meta = getNormalizedMeta(parsed.result_columns); + this.emit("meta", this.meta); + } else if (parsed.message_type === "FINISH_SUCCESSFULLY") { + this.finished = true; + this.tryPushPendingData(); + } else if (parsed.message_type === "FINISH_WITH_ERRORS") { + // Ensure source stream is resumed before destroying to prevent hanging + if (this.sourceStream && this.inputPaused) { + this.sourceStream.resume(); + this.inputPaused = false; + } + this.destroy( + new Error( + `Result encountered an error: ${parsed.errors + .map((error: { description: string }) => error.description) + .join("\n")}` + ) + ); + } + } else { + this.destroy(new Error(`Result row could not be parsed: ${line}`)); + } + } catch (err) { + this.destroy(err); + } + } + private handleDataMessage(parsed: { data: unknown[] }) { if (parsed.data) { // Process rows one by one to handle backpressure properly @@ -92,18 +137,6 @@ export class ServerSideStream extends Readable { // Add to pending rows buffer this.pendingRows.push(...normalizedData); - // If we have too many pending rows, pause the readline interface to apply backpressure - // Only pause if we're not already processing and have significantly exceeded the limit - if ( - this.pendingRows.length > this.maxPendingRows && - this.readlineInterface && - !this.readlineInterfacePaused && - !this.processingData - ) { - this.readlineInterface.pause(); - this.readlineInterfacePaused = true; - } - // Try to push data immediately if not already processing if (!this.processingData) { this.tryPushPendingData(); @@ -122,14 +155,14 @@ export class ServerSideStream extends Readable { const row = this.pendingRows.shift(); const canContinue = this.push(row); - // If pending rows dropped below threshold, resume the readline interface + // If pending rows dropped below threshold, resume the source stream if ( this.pendingRows.length <= this.maxPendingRows / 4 && - this.readlineInterface && - this.readlineInterfacePaused + this.sourceStream && + this.inputPaused ) { - this.readlineInterface.resume(); - this.readlineInterfacePaused = false; + this.sourceStream.resume(); + this.inputPaused = false; } // If push returns false, stop pushing and wait for _read to be called @@ -155,26 +188,33 @@ export class ServerSideStream extends Readable { this.tryPushPendingData(); } - // Also resume readline interface if it was paused and we have capacity + // Also resume source stream if it was paused and we have capacity if ( - this.readlineInterface && - this.readlineInterfacePaused && + this.sourceStream && + this.inputPaused && this.pendingRows.length < this.maxPendingRows / 2 ) { - this.readlineInterface.resume(); - this.readlineInterfacePaused = false; + this.sourceStream.resume(); + this.inputPaused = false; } } _destroy(err: Error | null, callback: (error?: Error | null) => void) { - if (this.readlineInterface) { - // Resume interface if paused to ensure proper cleanup - if (this.readlineInterfacePaused) { - this.readlineInterface.resume(); - this.readlineInterfacePaused = false; + if (this.sourceStream) { + // Resume stream if paused to ensure proper cleanup + if (this.inputPaused) { + this.sourceStream.resume(); + this.inputPaused = false; + } + + // Only call destroy if it exists (for Node.js streams) + const destroyableStream = this.sourceStream as unknown as { + destroy?: () => void; + }; + if (typeof destroyableStream.destroy === "function") { + destroyableStream.destroy(); } - this.readlineInterface.close(); - this.readlineInterface = null; + this.sourceStream = null; } callback(err); } diff --git a/test/integration/v2/stream-data-types.test.ts b/test/integration/v2/stream-data-types.test.ts new file mode 100644 index 00000000..ad6d624a --- /dev/null +++ b/test/integration/v2/stream-data-types.test.ts @@ -0,0 +1,276 @@ +import { Firebolt } from "../../../src"; +import stream from "node:stream"; +import fs from "node:fs"; +import path from "node:path"; +import os from "node:os"; +import BigNumber from "bignumber.js"; + +const connectionParams = { + auth: { + client_id: process.env.FIREBOLT_CLIENT_ID as string, + client_secret: process.env.FIREBOLT_CLIENT_SECRET as string + }, + account: process.env.FIREBOLT_ACCOUNT as string, + database: process.env.FIREBOLT_DATABASE as string, + engineName: process.env.FIREBOLT_ENGINE_NAME as string +}; + +jest.setTimeout(350000); + +describe("advanced stream tests", () => { + it("stream with different data types and memory management", async () => { + const firebolt = Firebolt({ + apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string + }); + const connection = await firebolt.connect(connectionParams); + + // Generate a query with various data types + const seriesNum = 100000; + const generateLargeResultQuery = (rows: number) => ` + SELECT + i as id, + 'user_' || i::string as username, + 'email_' || i::string || '@example.com' as email, + CASE WHEN i % 2 = 0 THEN true ELSE false END as status, + CAST('100000000000000000' as BIGINT) as big_number, + '2024-01-01'::date + (i % 365) as created_date, + RANDOM() * 1000 as score, + 'Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.' as description + FROM generate_series(1, ${rows}) as i + `; + + const statement = await connection.executeStream( + generateLargeResultQuery(seriesNum), + { + response: { + normalizeData: true, + bigNumberAsString: false + } + } + ); + + const { data } = await statement.streamResult(); + + // Add meta event handler to verify column metadata + data.on("meta", m => { + expect(m).toEqual([ + { name: "id", type: "int" }, + { name: "username", type: "text" }, + { name: "email", type: "text" }, + { name: "status", type: "boolean" }, + { name: "big_number", type: "long" }, + { name: "created_date", type: "date" }, + { name: "score", type: "double" }, + { name: "description", type: "text" } + ]); + }); + + // Buffer pool configuration + const poolSize = 8192; // 8KB + const poolBuffer = Buffer.allocUnsafe(poolSize); + const newlineCode = 0x0a; // '\n' character code + + // Track memory usage + const initialMemory = process.memoryUsage(); + let maxMemoryUsed = initialMemory.heapUsed; + let rowCount = 0; + + // Create a JSON transform stream with minimal allocation + const jsonTransform = new stream.Transform({ + objectMode: true, + highWaterMark: 1, // Limit buffering - critical for memory + transform( + row: unknown, + encoding: BufferEncoding, + callback: (error?: Error | null) => void + ) { + try { + rowCount++; + + if (rowCount % 5000 === 0) { + const currentMemory = process.memoryUsage(); + maxMemoryUsed = Math.max(maxMemoryUsed, currentMemory.heapUsed); + if (global.gc) { + console.log(`Invoking GC at row ${rowCount}`); + global.gc(); + } + } + + // Verify data types are correct for normalized data on first row + if (rowCount === 1) { + const typedRow = row as Record; + + // Verify actual values for first row + expect(typedRow.id).toBe(1); + expect(typedRow.username).toBe("user_1"); + expect(typedRow.email).toBe("email_1@example.com"); + expect(typedRow.status).toBe(false); // i=1, 1%2=1, so false + expect(typedRow.big_number).toEqual( + new BigNumber("100000000000000000") + ); + expect(typedRow.created_date).toEqual(new Date("2024-01-02")); // 2024-01-01 + 1 day + expect(typedRow.description).toBe( + "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua." + ); + } + + // Verify data types are correct for normalized data on last row + if (rowCount === seriesNum) { + const typedRow = row as Record; + + // Verify actual values for last row + expect(typedRow.id).toBe(seriesNum); + expect(typedRow.username).toBe(`user_${seriesNum}`); + expect(typedRow.email).toBe(`email_${seriesNum}@example.com`); + expect(typedRow.status).toBe(true); // seriesNum=100000, 100000%2=0, so true + expect(typedRow.big_number).toEqual( + new BigNumber("100000000000000000") + ); + expect(typedRow.created_date).toEqual(new Date("2024-12-21")); // 2024-01-01 + (100000 % 365) = 2024-01-01 + 269 days + expect(typedRow.description).toBe( + "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua." + ); + } + + const json = JSON.stringify(row); + const jsonLen = Buffer.byteLength(json); + const totalLen = jsonLen + 1; + + let buffer: Buffer; + if (totalLen <= poolSize) { + // Use pool for small rows - no allocation + poolBuffer.write(json, 0, jsonLen); + poolBuffer[jsonLen] = newlineCode; + buffer = poolBuffer.subarray(0, totalLen); + } else { + // Allocate for large rows + buffer = Buffer.allocUnsafe(totalLen); + buffer.write(json, 0, jsonLen); + buffer[jsonLen] = newlineCode; + } + + this.push(buffer); + callback(); + } catch (err) { + callback(err as Error); + } + } + }); + + // Create a batch processing stream that writes to a temporary file + let processedChunks = 0; + const batchSize = 100; + let batchBuffer: string[] = []; + + // Create a temporary file for writing + const tempDir = os.tmpdir(); + const tempFilePath = path.join( + tempDir, + `firebolt-stream-test-${Date.now()}.jsonl` + ); + const fileWriteStream = fs.createWriteStream(tempFilePath, { flags: "w" }); + + const outputStream = new stream.Writable({ + highWaterMark: 1, // Reasonable buffer size + write(chunk: Buffer, encoding, callback) { + processedChunks++; + const chunkStr = chunk.toString(); + batchBuffer.push(chunkStr); + + // Process in batches to create natural backpressure patterns + if (batchBuffer.length >= batchSize) { + // Process the batch synchronously (simulate some work) + const batchData = batchBuffer.join(""); + const lines = batchData.split("\n").filter(line => line.trim()); + + // Write valid JSON lines to the file + for (const line of lines) { + try { + JSON.parse(line); // Verify it's valid JSON + fileWriteStream.write(line + "\n"); + } catch (e) { + // Skip invalid JSON lines + } + } + + // Clear the batch + batchBuffer = []; + } + + callback(); + }, + + final(callback) { + // Process any remaining items in the final batch + if (batchBuffer.length > 0) { + const batchData = batchBuffer.join(""); + const lines = batchData.split("\n").filter(line => line.trim()); + + // Write remaining valid JSON lines to the file + for (const line of lines) { + try { + JSON.parse(line); // Verify it's valid JSON + fileWriteStream.write(line + "\n"); + } catch (e) { + // Skip invalid JSON lines + } + } + batchBuffer = []; + } + + // Close the file stream + fileWriteStream.end(() => { + callback(); + }); + } + }); + + // Use pipeline for proper backpressure handling + await stream.promises.pipeline(data, jsonTransform, outputStream); + + // Verify everything worked correctly + expect(rowCount).toBe(seriesNum); + expect(processedChunks).toBeGreaterThan(0); + + // Verify the file was created and has content + expect(fs.existsSync(tempFilePath)).toBe(true); + const fileStats = fs.statSync(tempFilePath); + expect(fileStats.size).toBeGreaterThan(0); + + // Read a few lines from the file to verify JSON format + const fileContent = fs.readFileSync(tempFilePath, "utf-8"); + const lines = fileContent.split("\n").filter(line => line.trim()); + expect(lines.length).toBeGreaterThan(0); + + // Verify first line is valid JSON + if (lines.length > 0) { + const firstRow = JSON.parse(lines[0]); + expect(firstRow).toHaveProperty("id"); + expect(firstRow).toHaveProperty("username"); + } + + // Clean up the temporary file + fs.unlinkSync(tempFilePath); + + // Memory usage should remain reasonable with proper streaming + const memoryGrowth = + (maxMemoryUsed - initialMemory.heapUsed) / (1024 * 1024); + + if (global.gc) { + expect(memoryGrowth).toBeLessThan(30); + } else { + expect(memoryGrowth).toBeLessThan(100); + } + + console.log( + `Data types streaming test: processed ${rowCount} rows with various data types, ` + + `memory growth: ${memoryGrowth.toFixed( + 2 + )} MB, processed chunks: ${processedChunks}, file size: ${( + fileStats.size / + 1024 / + 1024 + ).toFixed(2)} MB` + ); + }); +}); diff --git a/test/integration/v2/stream.test.ts b/test/integration/v2/stream.test.ts index 37476b5a..26473547 100644 --- a/test/integration/v2/stream.test.ts +++ b/test/integration/v2/stream.test.ts @@ -1,7 +1,6 @@ import exp from "node:constants"; import { Firebolt } from "../../../src"; import stream, { TransformCallback } from "node:stream"; -import BigNumber from "bignumber.js"; const connectionParams = { auth: { @@ -258,176 +257,4 @@ describe("streams", () => { console.log(`Pipeline test: processed ${processedCount} rows successfully`); }); - it("stream with different data types and memory management", async () => { - const firebolt = Firebolt({ - apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string - }); - const connection = await firebolt.connect(connectionParams); - - // Generate a query with various data types - const seriesNum = 100000; - const generateLargeResultQuery = (rows: number) => ` - SELECT - i as id, - 'user_' || i::string as username, - 'email_' || i::string || '@example.com' as email, - CASE WHEN i % 2 = 0 THEN true ELSE false END as status, - CAST('100000000000000000' as BIGINT) as big_number, - '2024-01-01'::date + (i % 365) as created_date, - RANDOM() * 1000 as score, - 'Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.' as description - FROM generate_series(1, ${rows}) as i - `; - - const statement = await connection.executeStream( - generateLargeResultQuery(seriesNum), - { - response: { - normalizeData: true, - bigNumberAsString: false - } - } - ); - - const { data } = await statement.streamResult(); - - // Add meta event handler to verify column metadata - data.on("meta", m => { - expect(m).toEqual([ - { name: "id", type: "int" }, - { name: "username", type: "text" }, - { name: "email", type: "text" }, - { name: "status", type: "boolean" }, - { name: "big_number", type: "long" }, - { name: "created_date", type: "date" }, - { name: "score", type: "double" }, - { name: "description", type: "text" } - ]); - }); - - // Buffer pool configuration - const poolSize = 8192; // 8KB - const poolBuffer = Buffer.allocUnsafe(poolSize); - const newlineCode = 0x0a; // '\n' character code - - // Track memory usage - const initialMemory = process.memoryUsage(); - let maxMemoryUsed = initialMemory.heapUsed; - let rowCount = 0; - - // Create a JSON transform stream with minimal allocation - const jsonTransform = new stream.Transform({ - objectMode: true, - highWaterMark: 1, // Limit buffering - critical for memory - transform( - row: unknown, - encoding: BufferEncoding, - callback: (error?: Error | null) => void - ) { - try { - rowCount++; - - if (rowCount % 5000 === 0) { - const currentMemory = process.memoryUsage(); - maxMemoryUsed = Math.max(maxMemoryUsed, currentMemory.heapUsed); - } - - // Verify data types are correct for normalized data on first row - if (rowCount === 1) { - const typedRow = row as Record; - - // Verify actual values for first row - expect(typedRow.id).toBe(1); - expect(typedRow.username).toBe("user_1"); - expect(typedRow.email).toBe("email_1@example.com"); - expect(typedRow.status).toBe(false); // i=1, 1%2=1, so false - expect(typedRow.big_number).toEqual( - new BigNumber("100000000000000000") - ); - expect(typedRow.created_date).toEqual(new Date("2024-01-02")); // 2024-01-01 + 1 day - expect(typedRow.description).toBe( - "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua." - ); - } - - // Verify data types are correct for normalized data on last row - if (rowCount === seriesNum) { - const typedRow = row as Record; - - // Verify actual values for last row - expect(typedRow.id).toBe(seriesNum); - expect(typedRow.username).toBe(`user_${seriesNum}`); - expect(typedRow.email).toBe(`email_${seriesNum}@example.com`); - expect(typedRow.status).toBe(true); // seriesNum=100000, 100000%2=0, so true - expect(typedRow.big_number).toEqual( - new BigNumber("100000000000000000") - ); - expect(typedRow.created_date).toEqual(new Date("2024-12-21")); // 2024-01-01 + (100000 % 365) = 2024-01-01 + 269 days - expect(typedRow.description).toBe( - "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua." - ); - } - - const json = JSON.stringify(row); - const jsonLen = Buffer.byteLength(json); - const totalLen = jsonLen + 1; - - let buffer: Buffer; - if (totalLen <= poolSize) { - // Use pool for small rows - no allocation - poolBuffer.write(json, 0, jsonLen); - poolBuffer[jsonLen] = newlineCode; - buffer = poolBuffer.subarray(0, totalLen); - } else { - // Allocate for large rows - buffer = Buffer.allocUnsafe(totalLen); - buffer.write(json, 0, jsonLen); - buffer[jsonLen] = newlineCode; - } - - this.push(buffer); - callback(); - } catch (err) { - callback(err as Error); - } - } - }); - - // Create a moderate backpressure stream - let processedChunks = 0; - const outputStream = new stream.Writable({ - highWaterMark: 1, - write(chunk, encoding, callback) { - processedChunks++; - - // Simulate occasional slow processing with minimal delays - if (processedChunks % 1000 === 0) { - setTimeout(() => { - callback(); - }, 1); // 1ms delay - } else { - callback(); - } - } - }); - - // Use pipeline for proper backpressure handling - await stream.promises.pipeline(data, jsonTransform, outputStream); - - // Verify everything worked correctly - expect(rowCount).toBe(seriesNum); - expect(processedChunks).toBeGreaterThan(0); - - // Memory usage should remain reasonable with proper streaming - const memoryGrowth = - (maxMemoryUsed - initialMemory.heapUsed) / (1024 * 1024); - expect(memoryGrowth).toBeLessThan(120); // Allow reasonable memory for complex data types with various field types - - console.log( - `Data types streaming test: processed ${rowCount} rows with various data types, ` + - `memory growth: ${memoryGrowth.toFixed( - 2 - )} MB, processed chunks: ${processedChunks}` - ); - }); }); diff --git a/test/unit/v2/serverSideStream.test.ts b/test/unit/v2/serverSideStream.test.ts new file mode 100644 index 00000000..d23963a7 --- /dev/null +++ b/test/unit/v2/serverSideStream.test.ts @@ -0,0 +1,430 @@ +import { PassThrough } from "stream"; +import { Response } from "node-fetch"; +import { ServerSideStream } from "../../../src/statement/stream/serverSideStream"; +import { ExecuteQueryOptions } from "../../../src/types"; + +describe("ServerSideStream", () => { + let mockResponse: Partial; + let executeQueryOptions: ExecuteQueryOptions; + + beforeEach(() => { + mockResponse = {}; + executeQueryOptions = {}; + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + describe("backpressure and pause/resume functionality", () => { + it("should pause source stream when pending rows exceed maxPendingRows threshold", done => { + const sourceStream = new PassThrough(); + mockResponse.body = sourceStream; + + let pauseCalled = false; + let pauseCallCount = 0; + + // Mock the pause method on the response body (this is what ServerSideStream calls) + sourceStream.pause = jest.fn(() => { + pauseCalled = true; + pauseCallCount++; + return sourceStream; + }); + + const serverSideStream = new ServerSideStream( + mockResponse as never, + executeQueryOptions + ); + + // Mock push to return false immediately to create backpressure from the start + // This should cause rows to accumulate in pendingRows + serverSideStream.push = jest.fn((_chunk: unknown) => { + // Always return false to simulate constant backpressure + return false; + }); + + serverSideStream.on("error", done); + + // First send the START message + sourceStream.write( + JSON.stringify({ + message_type: "START", + result_columns: [{ name: "id", type: "integer" }] + }) + "\n" + ); + + // Send fewer data messages but with multiple rows each + // Each data message can contain multiple rows in the data array + let dataMessages = ""; + for (let i = 0; i < 3; i++) { + dataMessages += + JSON.stringify({ + message_type: "DATA", + data: [[i * 3 + 0], [i * 3 + 1], [i * 3 + 2]] + }) + "\n"; + } + sourceStream.write(dataMessages); + + // Give the stream time to process and check if pause was called + setTimeout(() => { + expect(pauseCalled).toBe(true); + expect(pauseCallCount).toBeGreaterThan(0); + done(); + }, 100); + }); + + it("should resume source stream when pending rows drop below threshold", done => { + const sourceStream = new PassThrough(); + mockResponse.body = sourceStream; + + const serverSideStream = new ServerSideStream( + mockResponse as never, + executeQueryOptions + ); + + let pauseCalled = false; + let resumeCalled = false; + + // Set up mocks AFTER ServerSideStream is created to avoid initialization issues + sourceStream.pause = jest.fn(() => { + pauseCalled = true; + return sourceStream; + }); + + sourceStream.resume = jest.fn(() => { + resumeCalled = true; + return sourceStream; + }); + + serverSideStream.on("error", done); + + // Mock push to always return false to accumulate rows and trigger pause + serverSideStream.push = jest.fn(() => { + return false; + }); + + // Send START message + sourceStream.write( + JSON.stringify({ + message_type: "START", + result_columns: [{ name: "id", type: "integer" }] + }) + "\n" + ); + + // Send multiple DATA messages to exceed maxPendingRows threshold + let dataMessages = ""; + for (let i = 0; i < 3; i++) { + dataMessages += + JSON.stringify({ + message_type: "DATA", + data: [[i * 3 + 0], [i * 3 + 1], [i * 3 + 2]] + }) + "\n"; + } + sourceStream.write(dataMessages); + + setTimeout(() => { + expect(pauseCalled).toBe(true); + + // Now test resume: change push to return true (allow processing) + serverSideStream.push = jest.fn(() => true); + + // Call _read to trigger the resume condition check + serverSideStream._read(); + + setTimeout(() => { + expect(resumeCalled).toBe(true); + done(); + }, 50); + }, 100); + }); + + it("should properly clean up on stream destruction", done => { + const sourceStream = new PassThrough(); + mockResponse.body = sourceStream; + + let resumeCalled = false; + + // Mock resume to track cleanup + sourceStream.resume = jest.fn(() => { + resumeCalled = true; + return sourceStream; + }); + + // Mock destroy method + sourceStream.destroy = jest.fn(); + + const serverSideStream = new ServerSideStream( + mockResponse as never, + executeQueryOptions + ); + + // First pause the stream by creating backpressure + serverSideStream.push = jest.fn((_chunk: unknown) => { + return false; // Always return false to simulate backpressure + }); + + // Send start message + sourceStream.write( + JSON.stringify({ + message_type: "START", + result_columns: [{ name: "id", type: "integer" }] + }) + "\n" + ); + + // Send data to trigger pause + let dataMessages = ""; + for (let i = 0; i < 3; i++) { + dataMessages += + JSON.stringify({ + message_type: "DATA", + data: [[i * 3 + 0], [i * 3 + 1], [i * 3 + 2]] + }) + "\n"; + } + sourceStream.write(dataMessages); + + setTimeout(() => { + // Destroy the stream + serverSideStream.destroy(); + + // Check that cleanup happened + expect(resumeCalled).toBe(true); + expect(sourceStream.destroy).toHaveBeenCalled(); + done(); + }, 50); + }); + + it("should handle error messages and cleanup properly", done => { + const sourceStream = new PassThrough(); + mockResponse.body = sourceStream; + + const serverSideStream = new ServerSideStream( + mockResponse as never, + executeQueryOptions + ); + + serverSideStream.on("error", (error: Error) => { + // Check that error handling resumed the stream for cleanup + expect(error.message).toContain("Result encountered an error"); + done(); + }); + + serverSideStream.on("data", () => { + // No-op to consume data + }); + + // Send start message + sourceStream.write( + JSON.stringify({ + message_type: "START", + result_columns: [{ name: "id", type: "integer" }] + }) + "\n" + ); + + let dataMessages = ""; + for (let i = 0; i < 3; i++) { + dataMessages += + JSON.stringify({ + message_type: "DATA", + data: [[i * 3 + 0]] + }) + "\n"; + } + + // Send error message + setTimeout(() => { + sourceStream.write(dataMessages); + sourceStream.write( + JSON.stringify({ + message_type: "FINISH_WITH_ERRORS", + errors: [{ description: "Test error" }] + }) + "\n" + ); + }, 500); + }); + }); + + describe("basic functionality", () => { + it("should emit meta event on START message", done => { + const sourceStream = new PassThrough(); + mockResponse.body = sourceStream; + + const serverSideStream = new ServerSideStream( + mockResponse as never, + executeQueryOptions + ); + + let metaReceived = false; + + serverSideStream.on("meta", (meta: unknown) => { + metaReceived = true; + expect(meta).toBeDefined(); + expect(Array.isArray(meta)).toBe(true); + }); + + serverSideStream.on("error", done); + + // Add data event handler to consume the stream + serverSideStream.on("data", () => { + // No data expected, but this ensures stream consumption + }); + + // Send START message + sourceStream.write( + JSON.stringify({ + message_type: "START", + result_columns: [{ name: "id", type: "integer" }] + }) + "\n" + ); + + // Send FINISH message to complete + sourceStream.write( + JSON.stringify({ + message_type: "FINISH_SUCCESSFULLY" + }) + "\n" + ); + + sourceStream.end(); + + serverSideStream.on("end", () => { + expect(metaReceived).toBe(true); + done(); + }); + }); + + it("should emit data events for DATA messages", done => { + const sourceStream = new PassThrough(); + mockResponse.body = sourceStream; + + const serverSideStream = new ServerSideStream( + mockResponse as never, + executeQueryOptions + ); + + const dataEvents: unknown[] = []; + + serverSideStream.on("data", (data: unknown) => { + dataEvents.push(data); + }); + + serverSideStream.on("error", done); + + // Send START message + sourceStream.write( + JSON.stringify({ + message_type: "START", + result_columns: [{ name: "id", type: "integer" }] + }) + "\n" + ); + + // Send DATA messages + sourceStream.write( + JSON.stringify({ + message_type: "DATA", + data: [[1], [2], [3]] + }) + "\n" + ); + + // Send FINISH message + sourceStream.write( + JSON.stringify({ + message_type: "FINISH_SUCCESSFULLY" + }) + "\n" + ); + + sourceStream.end(); + + serverSideStream.on("end", () => { + expect(dataEvents).toHaveLength(3); + done(); + }); + }); + + it("should handle null response body", done => { + mockResponse.body = undefined; + + const serverSideStream = new ServerSideStream( + mockResponse as never, + executeQueryOptions + ); + + serverSideStream.on("error", (error: Error) => { + expect(error.message).toBe("Response body is null or undefined"); + done(); + }); + }); + + it("should handle malformed JSON gracefully", done => { + const sourceStream = new PassThrough(); + mockResponse.body = sourceStream; + + const serverSideStream = new ServerSideStream( + mockResponse as never, + executeQueryOptions + ); + + serverSideStream.on("error", (error: Error) => { + expect(error).toBeDefined(); + done(); + }); + + // Send malformed JSON + sourceStream.write("{ invalid json }\n"); + }); + + it("should handle partial lines in buffer correctly", done => { + const sourceStream = new PassThrough(); + mockResponse.body = sourceStream; + + const serverSideStream = new ServerSideStream( + mockResponse as never, + executeQueryOptions + ); + + let metaReceived = false; + let dataReceived = false; + + serverSideStream.on("meta", () => { + metaReceived = true; + }); + + serverSideStream.on("data", () => { + dataReceived = true; + }); + + serverSideStream.on("end", () => { + expect(metaReceived).toBe(true); + expect(dataReceived).toBe(true); + done(); + }); + + serverSideStream.on("error", done); + + // Send data in chunks that split JSON lines + const startMessage = + JSON.stringify({ + message_type: "START", + result_columns: [{ name: "id", type: "integer" }] + }) + "\n"; + + const dataMessage = + JSON.stringify({ + message_type: "DATA", + data: [[1]] + }) + "\n"; + + const finishMessage = + JSON.stringify({ + message_type: "FINISH_SUCCESSFULLY" + }) + "\n"; + + // Split messages across multiple chunks + const fullMessage = startMessage + dataMessage + finishMessage; + const midPoint = Math.floor(fullMessage.length / 2); + + sourceStream.write(fullMessage.slice(0, midPoint)); + setTimeout(() => { + sourceStream.write(fullMessage.slice(midPoint)); + sourceStream.end(); + }, 10); + }); + }); +}); From d65edcc479be4b0f87b8dd1745b467f0fae1d952 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Tue, 18 Nov 2025 09:03:26 +0000 Subject: [PATCH 09/11] Add sum verification to tests --- test/integration/v2/stream-data-types.test.ts | 10 ++++++++++ test/integration/v2/stream.test.ts | 8 +++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/test/integration/v2/stream-data-types.test.ts b/test/integration/v2/stream-data-types.test.ts index ad6d624a..d4e4b343 100644 --- a/test/integration/v2/stream-data-types.test.ts +++ b/test/integration/v2/stream-data-types.test.ts @@ -74,6 +74,7 @@ describe("advanced stream tests", () => { const initialMemory = process.memoryUsage(); let maxMemoryUsed = initialMemory.heapUsed; let rowCount = 0; + let idSum = 0; // Track sum of id column for data integrity verification // Create a JSON transform stream with minimal allocation const jsonTransform = new stream.Transform({ @@ -87,6 +88,11 @@ describe("advanced stream tests", () => { try { rowCount++; + // Add the id to our sum for data integrity verification + const typedRow = row as Record; + const id = typedRow.id as number; + idSum += id; + if (rowCount % 5000 === 0) { const currentMemory = process.memoryUsage(); maxMemoryUsed = Math.max(maxMemoryUsed, currentMemory.heapUsed); @@ -232,6 +238,10 @@ describe("advanced stream tests", () => { expect(rowCount).toBe(seriesNum); expect(processedChunks).toBeGreaterThan(0); + // Verify data integrity: sum of 1 to N should be N*(N+1)/2 + const expectedSum = (seriesNum * (seriesNum + 1)) / 2; + expect(idSum).toBe(expectedSum); + // Verify the file was created and has content expect(fs.existsSync(tempFilePath)).toBe(true); const fileStats = fs.statSync(tempFilePath); diff --git a/test/integration/v2/stream.test.ts b/test/integration/v2/stream.test.ts index 26473547..095c0237 100644 --- a/test/integration/v2/stream.test.ts +++ b/test/integration/v2/stream.test.ts @@ -177,10 +177,12 @@ describe("streams", () => { const initialMemory = process.memoryUsage(); let maxMemoryUsed = initialMemory.heapUsed; let rowCount = 0; + let idSum = 0; // Process data with simple event handlers (like existing tests) - data.on("data", () => { + data.on("data", row => { rowCount++; + idSum += row[0]; // Sum the id values for data integrity // Track memory usage periodically if (rowCount % 50 === 0) { @@ -197,6 +199,10 @@ describe("streams", () => { // Verify the data was processed correctly expect(rowCount).toBe(1000000); + + // Verify data integrity with sum check + const expectedSum = (1000000 * 1000001) / 2; // Sum of 1 to 1,000,000 + expect(idSum).toBe(expectedSum); // Memory usage should remain reasonable with proper streaming const memoryGrowth = From 38a92c96e77d670796b63cd0ad90b08876128df7 Mon Sep 17 00:00:00 2001 From: ptiurin Date: Tue, 18 Nov 2025 13:27:23 +0000 Subject: [PATCH 10/11] remove duplicate resume --- src/statement/stream/serverSideStream.ts | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/src/statement/stream/serverSideStream.ts b/src/statement/stream/serverSideStream.ts index 3cd4a0b1..cca244f6 100644 --- a/src/statement/stream/serverSideStream.ts +++ b/src/statement/stream/serverSideStream.ts @@ -155,16 +155,6 @@ export class ServerSideStream extends Readable { const row = this.pendingRows.shift(); const canContinue = this.push(row); - // If pending rows dropped below threshold, resume the source stream - if ( - this.pendingRows.length <= this.maxPendingRows / 4 && - this.sourceStream && - this.inputPaused - ) { - this.sourceStream.resume(); - this.inputPaused = false; - } - // If push returns false, stop pushing and wait for _read to be called if (!canContinue) { this.processingData = false; @@ -192,7 +182,7 @@ export class ServerSideStream extends Readable { if ( this.sourceStream && this.inputPaused && - this.pendingRows.length < this.maxPendingRows / 2 + this.pendingRows.length < this.maxPendingRows / 4 ) { this.sourceStream.resume(); this.inputPaused = false; From a4996071216ab1d550f49b6013957bbc00fc046a Mon Sep 17 00:00:00 2001 From: ptiurin Date: Tue, 18 Nov 2025 16:11:06 +0000 Subject: [PATCH 11/11] rename threshold --- src/statement/stream/serverSideStream.ts | 6 +- test/unit/v2/serverSideStream.test.ts | 130 ++++++++++++----------- 2 files changed, 72 insertions(+), 64 deletions(-) diff --git a/src/statement/stream/serverSideStream.ts b/src/statement/stream/serverSideStream.ts index cca244f6..c51eacc6 100644 --- a/src/statement/stream/serverSideStream.ts +++ b/src/statement/stream/serverSideStream.ts @@ -14,7 +14,7 @@ export class ServerSideStream extends Readable { private finished = false; private processingData = false; private inputPaused = false; - private readonly maxPendingRows = 5; // Limit pending rows to prevent memory buildup + private readonly bufferGrowthThreshold = 10; // Stop adding to buffer when over this many rows are already in private lineBuffer = ""; private sourceStream: NodeJS.ReadableStream | null = null; @@ -68,7 +68,7 @@ export class ServerSideStream extends Readable { // Apply backpressure if we have too many pending rows if ( - this.pendingRows.length > this.maxPendingRows && + this.pendingRows.length > this.bufferGrowthThreshold && this.sourceStream && !this.inputPaused && !this.processingData @@ -182,7 +182,7 @@ export class ServerSideStream extends Readable { if ( this.sourceStream && this.inputPaused && - this.pendingRows.length < this.maxPendingRows / 4 + this.pendingRows.length < this.bufferGrowthThreshold ) { this.sourceStream.resume(); this.inputPaused = false; diff --git a/test/unit/v2/serverSideStream.test.ts b/test/unit/v2/serverSideStream.test.ts index d23963a7..91da0897 100644 --- a/test/unit/v2/serverSideStream.test.ts +++ b/test/unit/v2/serverSideStream.test.ts @@ -17,17 +17,15 @@ describe("ServerSideStream", () => { }); describe("backpressure and pause/resume functionality", () => { - it("should pause source stream when pending rows exceed maxPendingRows threshold", done => { + it("should pause source stream when pending rows exceed bufferGrowthThreshold", done => { const sourceStream = new PassThrough(); mockResponse.body = sourceStream; let pauseCalled = false; - let pauseCallCount = 0; - // Mock the pause method on the response body (this is what ServerSideStream calls) + // Mock the pause method sourceStream.pause = jest.fn(() => { pauseCalled = true; - pauseCallCount++; return sourceStream; }); @@ -36,16 +34,9 @@ describe("ServerSideStream", () => { executeQueryOptions ); - // Mock push to return false immediately to create backpressure from the start - // This should cause rows to accumulate in pendingRows - serverSideStream.push = jest.fn((_chunk: unknown) => { - // Always return false to simulate constant backpressure - return false; - }); - serverSideStream.on("error", done); - // First send the START message + // Send START message first sourceStream.write( JSON.stringify({ message_type: "START", @@ -53,24 +44,32 @@ describe("ServerSideStream", () => { }) + "\n" ); - // Send fewer data messages but with multiple rows each - // Each data message can contain multiple rows in the data array - let dataMessages = ""; - for (let i = 0; i < 3; i++) { - dataMessages += + // Wait for start message to be processed, then setup backpressure + setTimeout(() => { + // Mock push to always return false to create backpressure + const originalPush = serverSideStream.push.bind(serverSideStream); + serverSideStream.push = jest.fn(chunk => { + if (chunk !== null) { + return false; // Always return false for data chunks + } + return originalPush(chunk); // Allow null (end) to pass through + }); + + // Send a large single data message with many rows to ensure we exceed threshold + const largeDataMessage = JSON.stringify({ message_type: "DATA", - data: [[i * 3 + 0], [i * 3 + 1], [i * 3 + 2]] + data: Array.from({ length: 15 }, (_, i) => [i]) // 15 rows, exceeds threshold of 10 }) + "\n"; - } - sourceStream.write(dataMessages); - // Give the stream time to process and check if pause was called - setTimeout(() => { - expect(pauseCalled).toBe(true); - expect(pauseCallCount).toBeGreaterThan(0); - done(); - }, 100); + sourceStream.write(largeDataMessage); + + // Check after a delay + setTimeout(() => { + expect(pauseCalled).toBe(true); + done(); + }, 50); + }, 10); }); it("should resume source stream when pending rows drop below threshold", done => { @@ -98,12 +97,7 @@ describe("ServerSideStream", () => { serverSideStream.on("error", done); - // Mock push to always return false to accumulate rows and trigger pause - serverSideStream.push = jest.fn(() => { - return false; - }); - - // Send START message + // Send START message first sourceStream.write( JSON.stringify({ message_type: "START", @@ -111,31 +105,45 @@ describe("ServerSideStream", () => { }) + "\n" ); - // Send multiple DATA messages to exceed maxPendingRows threshold - let dataMessages = ""; - for (let i = 0; i < 3; i++) { - dataMessages += + setTimeout(() => { + // Mock push to always return false to accumulate rows and trigger pause + const originalPush = serverSideStream.push.bind(serverSideStream); + serverSideStream.push = jest.fn(chunk => { + if (chunk !== null) { + return false; // Always return false for data chunks to accumulate + } + return originalPush(chunk); + }); + + // Send large data message to exceed threshold and trigger pause + const largeDataMessage = JSON.stringify({ message_type: "DATA", - data: [[i * 3 + 0], [i * 3 + 1], [i * 3 + 2]] + data: Array.from({ length: 15 }, (_, i) => [i]) }) + "\n"; - } - sourceStream.write(dataMessages); - setTimeout(() => { - expect(pauseCalled).toBe(true); - - // Now test resume: change push to return true (allow processing) - serverSideStream.push = jest.fn(() => true); - - // Call _read to trigger the resume condition check - serverSideStream._read(); + sourceStream.write(largeDataMessage); setTimeout(() => { - expect(resumeCalled).toBe(true); - done(); + expect(pauseCalled).toBe(true); + + // Now test resume: change push to return true (allow processing) + serverSideStream.push = jest.fn(chunk => { + if (chunk !== null) { + return true; // Allow processing to drain buffer + } + return originalPush(chunk); + }); + + // Call _read to trigger the resume condition check + serverSideStream._read(); + + setTimeout(() => { + expect(resumeCalled).toBe(true); + done(); + }, 50); }, 50); - }, 100); + }, 10); }); it("should properly clean up on stream destruction", done => { @@ -159,7 +167,7 @@ describe("ServerSideStream", () => { ); // First pause the stream by creating backpressure - serverSideStream.push = jest.fn((_chunk: unknown) => { + serverSideStream.push = jest.fn(() => { return false; // Always return false to simulate backpressure }); @@ -172,15 +180,15 @@ describe("ServerSideStream", () => { ); // Send data to trigger pause - let dataMessages = ""; - for (let i = 0; i < 3; i++) { - dataMessages += + // Send messages one at a time to trigger backpressure check between messages + for (let i = 0; i < 6; i++) { + const dataMessage = JSON.stringify({ message_type: "DATA", - data: [[i * 3 + 0], [i * 3 + 1], [i * 3 + 2]] + data: [[i * 2 + 0], [i * 2 + 1]] }) + "\n"; + sourceStream.write(dataMessage); } - sourceStream.write(dataMessages); setTimeout(() => { // Destroy the stream @@ -220,18 +228,18 @@ describe("ServerSideStream", () => { }) + "\n" ); - let dataMessages = ""; - for (let i = 0; i < 3; i++) { - dataMessages += + // Send data one message at a time to trigger pause + for (let i = 0; i < 12; i++) { + const dataMessage = JSON.stringify({ message_type: "DATA", - data: [[i * 3 + 0]] + data: [[i]] }) + "\n"; + sourceStream.write(dataMessage); } // Send error message setTimeout(() => { - sourceStream.write(dataMessages); sourceStream.write( JSON.stringify({ message_type: "FINISH_WITH_ERRORS",