1- import exp from "node:constants" ;
21import { Firebolt } from "../../../src" ;
32import stream , { TransformCallback } from "node:stream" ;
43import BigNumber from "bignumber.js" ;
@@ -259,10 +258,20 @@ describe("streams", () => {
259258 console . log ( `Pipeline test: processed ${ processedCount } rows successfully` ) ;
260259 } ) ;
261260 it ( "stream with different data types and memory management" , async ( ) => {
261+ console . log ( "[DEBUG] Starting data types streaming test" ) ;
262+
263+ // Add timeout tracking to identify where test hangs
264+ const timeoutId = setTimeout ( ( ) => {
265+ console . error (
266+ "[DEBUG] Test timeout warning - test has been running for 5 minutes"
267+ ) ;
268+ } , 300000 ) ; // 5 minutes
269+
262270 const firebolt = Firebolt ( {
263271 apiEndpoint : process . env . FIREBOLT_API_ENDPOINT as string
264272 } ) ;
265273 const connection = await firebolt . connect ( connectionParams ) ;
274+ console . log ( "[DEBUG] Connected to Firebolt" ) ;
266275
267276 // Generate a query with various data types
268277 const seriesNum = 100000 ;
@@ -279,6 +288,7 @@ describe("streams", () => {
279288 FROM generate_series(1, ${ rows } ) as i
280289 ` ;
281290
291+ console . log ( `[DEBUG] Executing stream query for ${ seriesNum } rows` ) ;
282292 const statement = await connection . executeStream (
283293 generateLargeResultQuery ( seriesNum ) ,
284294 {
@@ -288,11 +298,14 @@ describe("streams", () => {
288298 }
289299 }
290300 ) ;
301+ console . log ( "[DEBUG] Query execution started, getting stream result" ) ;
291302
292303 const { data } = await statement . streamResult ( ) ;
304+ console . log ( "[DEBUG] Stream result obtained, setting up event handlers" ) ;
293305
294306 // Add meta event handler to verify column metadata
295307 data . on ( "meta" , m => {
308+ console . log ( "[DEBUG] Received meta event with" , m . length , "columns" ) ;
296309 try {
297310 expect ( m ) . toEqual ( [
298311 { name : "id" , type : "int" } ,
@@ -304,14 +317,17 @@ describe("streams", () => {
304317 { name : "score" , type : "double" } ,
305318 { name : "description" , type : "text" }
306319 ] ) ;
320+ console . log ( "[DEBUG] Meta validation passed" ) ;
307321 } catch ( err ) {
322+ console . error ( "[DEBUG] Meta validation failed:" , err ) ;
308323 // Re-emit error so test fails properly
309324 data . destroy ( err as Error ) ;
310325 }
311326 } ) ;
312327
313328 // Add error handler to ensure any stream errors are caught
314329 data . on ( "error" , err => {
330+ console . error ( "[DEBUG] Stream error occurred:" , err . message ) ;
315331 throw err ;
316332 } ) ;
317333
@@ -324,8 +340,14 @@ describe("streams", () => {
324340 const initialMemory = process . memoryUsage ( ) ;
325341 let maxMemoryUsed = initialMemory . heapUsed ;
326342 let rowCount = 0 ;
343+ console . log (
344+ "[DEBUG] Initial memory:" ,
345+ ( initialMemory . heapUsed / ( 1024 * 1024 ) ) . toFixed ( 2 ) ,
346+ "MB"
347+ ) ;
327348
328349 // Create a JSON transform stream with minimal allocation
350+ console . log ( "[DEBUG] Creating JSON transform stream" ) ;
329351 const jsonTransform = new stream . Transform ( {
330352 objectMode : true ,
331353 highWaterMark : 1 , // Limit buffering - critical for memory
@@ -337,13 +359,26 @@ describe("streams", () => {
337359 try {
338360 rowCount ++ ;
339361
362+ // More frequent progress logging for debugging
363+ if ( rowCount % 1000 === 0 ) {
364+ const currentMemory = process . memoryUsage ( ) ;
365+ maxMemoryUsed = Math . max ( maxMemoryUsed , currentMemory . heapUsed ) ;
366+ console . log (
367+ `[DEBUG] Processed ${ rowCount } rows, memory: ${ (
368+ currentMemory . heapUsed /
369+ ( 1024 * 1024 )
370+ ) . toFixed ( 2 ) } MB`
371+ ) ;
372+ }
373+
340374 if ( rowCount % 5000 === 0 ) {
341375 const currentMemory = process . memoryUsage ( ) ;
342376 maxMemoryUsed = Math . max ( maxMemoryUsed , currentMemory . heapUsed ) ;
343377 }
344378
345379 // Verify data types are correct for normalized data on first row
346380 if ( rowCount === 1 ) {
381+ console . log ( "[DEBUG] Validating first row data types" ) ;
347382 const typedRow = row as Record < string , unknown > ;
348383
349384 // Verify actual values for first row
@@ -358,10 +393,12 @@ describe("streams", () => {
358393 expect ( typedRow . description ) . toBe (
359394 "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."
360395 ) ;
396+ console . log ( "[DEBUG] First row validation passed" ) ;
361397 }
362398
363399 // Verify data types are correct for normalized data on last row
364400 if ( rowCount === seriesNum ) {
401+ console . log ( "[DEBUG] Validating last row data types" ) ;
365402 const typedRow = row as Record < string , unknown > ;
366403
367404 // Verify actual values for last row
@@ -376,6 +413,7 @@ describe("streams", () => {
376413 expect ( typedRow . description ) . toBe (
377414 "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."
378415 ) ;
416+ console . log ( "[DEBUG] Last row validation passed" ) ;
379417 }
380418
381419 const json = JSON . stringify ( row ) ;
@@ -405,12 +443,20 @@ describe("streams", () => {
405443
406444 // Create a moderate backpressure stream
407445 let processedChunks = 0 ;
446+ console . log ( "[DEBUG] Creating output transform stream" ) ;
408447 const outputStream = new stream . Transform ( {
409448 highWaterMark : 1 ,
410449 transform ( chunk , encoding , callback ) {
411450 try {
412451 processedChunks ++ ;
413452
453+ // Log progress for debugging
454+ if ( processedChunks % 10000 === 0 ) {
455+ console . log (
456+ `[DEBUG] Output stream processed ${ processedChunks } chunks`
457+ ) ;
458+ }
459+
414460 // Simulate occasional slow processing with minimal delays
415461 if ( processedChunks % 1000 === 0 ) {
416462 // Use setImmediate instead of setTimeout to avoid potential timing issues
@@ -421,21 +467,27 @@ describe("streams", () => {
421467 callback ( ) ;
422468 }
423469 } catch ( err ) {
470+ console . error ( "[DEBUG] Error in output stream:" , err ) ;
424471 callback ( err as Error ) ;
425472 }
426473 }
427474 } ) ;
428475
476+ console . log ( "[DEBUG] Starting pipeline" ) ;
429477 // Use pipeline for proper backpressure handling
430478 await stream . promises . pipeline ( data , jsonTransform , outputStream ) ;
479+ console . log ( "[DEBUG] Pipeline completed successfully" ) ;
431480
432481 // Verify everything worked correctly
482+ console . log ( `[DEBUG] Final row count: ${ rowCount } , expected: ${ seriesNum } ` ) ;
433483 expect ( rowCount ) . toBe ( seriesNum ) ;
484+ console . log ( `[DEBUG] Final processed chunks: ${ processedChunks } ` ) ;
434485 expect ( processedChunks ) . toBeGreaterThan ( 0 ) ;
435486
436487 // Memory usage should remain reasonable with proper streaming
437488 const memoryGrowth =
438489 ( maxMemoryUsed - initialMemory . heapUsed ) / ( 1024 * 1024 ) ;
490+ console . log ( `[DEBUG] Memory growth: ${ memoryGrowth . toFixed ( 2 ) } MB` ) ;
439491 expect ( memoryGrowth ) . toBeLessThan ( 120 ) ; // Allow reasonable memory for complex data types with various field types
440492
441493 console . log (
@@ -444,5 +496,7 @@ describe("streams", () => {
444496 2
445497 ) } MB, processed chunks: ${ processedChunks } `
446498 ) ;
499+ console . log ( "[DEBUG] Test completed successfully" ) ;
500+ clearTimeout ( timeoutId ) ;
447501 } ) ;
448502} ) ;
0 commit comments