Skip to content

Commit 61260ca

Browse files
committed
try this approach to testing
1 parent f453926 commit 61260ca

File tree

1 file changed

+106
-30
lines changed

1 file changed

+106
-30
lines changed

test/integration/v2/stream.test.ts

Lines changed: 106 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -259,10 +259,14 @@ describe("streams", () => {
259259
console.log(`Pipeline test: processed ${processedCount} rows successfully`);
260260
});
261261
it("stream with different data types and memory management", async () => {
262+
console.log("Starting data types streaming test...");
263+
const startTime = Date.now();
264+
262265
const firebolt = Firebolt({
263266
apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string
264267
});
265268
const connection = await firebolt.connect(connectionParams);
269+
console.log("Connected to Firebolt");
266270

267271
// Generate a query with various data types
268272
const seriesNum = 100000;
@@ -279,6 +283,7 @@ describe("streams", () => {
279283
FROM generate_series(1, ${rows}) as i
280284
`;
281285

286+
console.log(`Executing query for ${seriesNum} rows...`);
282287
const statement = await connection.executeStream(
283288
generateLargeResultQuery(seriesNum),
284289
{
@@ -289,10 +294,13 @@ describe("streams", () => {
289294
}
290295
);
291296

297+
console.log("Getting stream result...");
292298
const { data } = await statement.streamResult();
299+
console.log("Stream result obtained, setting up listeners...");
293300

294301
// Add meta event handler to verify column metadata
295302
data.on("meta", m => {
303+
console.log("Meta received:", m.length, "columns");
296304
expect(m).toEqual([
297305
{ name: "id", type: "int" },
298306
{ name: "username", type: "text" },
@@ -305,15 +313,17 @@ describe("streams", () => {
305313
]);
306314
});
307315

308-
// Buffer pool configuration
309-
const poolSize = 8192; // 8KB
310-
const poolBuffer = Buffer.allocUnsafe(poolSize);
311-
const newlineCode = 0x0a; // '\n' character code
316+
// Add error handler to the data stream
317+
data.on("error", error => {
318+
console.error("Data stream error:", error);
319+
throw error;
320+
});
312321

313322
// Track memory usage
314323
const initialMemory = process.memoryUsage();
315324
let maxMemoryUsed = initialMemory.heapUsed;
316325
let rowCount = 0;
326+
let transformStart = 0;
317327

318328
// Create a JSON transform stream with minimal allocation
319329
const jsonTransform = new stream.Transform({
@@ -327,9 +337,20 @@ describe("streams", () => {
327337
try {
328338
rowCount++;
329339

330-
if (rowCount % 5000 === 0) {
340+
if (rowCount === 1) {
341+
transformStart = Date.now();
342+
console.log("Transform started, first row received");
343+
}
344+
345+
if (rowCount % 10000 === 0) {
331346
const currentMemory = process.memoryUsage();
332347
maxMemoryUsed = Math.max(maxMemoryUsed, currentMemory.heapUsed);
348+
const elapsed = Date.now() - transformStart;
349+
const memoryMB = (currentMemory.heapUsed / 1024 / 1024).toFixed(2);
350+
console.log(
351+
`Transform progress: ${rowCount} rows processed in ${elapsed}ms, ` +
352+
`memory: ${memoryMB}MB`
353+
);
333354
}
334355

335356
// Verify data types are correct for normalized data on first row
@@ -369,65 +390,120 @@ describe("streams", () => {
369390
}
370391

371392
const json = JSON.stringify(row);
372-
const jsonLen = Buffer.byteLength(json);
373-
const totalLen = jsonLen + 1;
374-
375-
let buffer: Buffer;
376-
if (totalLen <= poolSize) {
377-
// Use pool for small rows - no allocation
378-
poolBuffer.write(json, 0, jsonLen);
379-
poolBuffer[jsonLen] = newlineCode;
380-
buffer = poolBuffer.subarray(0, totalLen);
381-
} else {
382-
// Allocate for large rows
383-
buffer = Buffer.allocUnsafe(totalLen);
384-
buffer.write(json, 0, jsonLen);
385-
buffer[jsonLen] = newlineCode;
393+
const jsonBuffer = Buffer.from(json + "\n", "utf8");
394+
395+
this.push(jsonBuffer);
396+
397+
if (rowCount === seriesNum) {
398+
const transformEnd = Date.now();
399+
const elapsed = transformEnd - transformStart;
400+
console.log(
401+
`Transform completed: ${seriesNum} rows processed in ${elapsed}ms`
402+
);
386403
}
387404

388-
this.push(buffer);
389405
callback();
390406
} catch (err) {
407+
console.error("Transform error:", err);
391408
callback(err as Error);
392409
}
393410
}
394411
});
395412

413+
// Add error handler to transform stream
414+
jsonTransform.on("error", error => {
415+
console.error("JSON transform stream error:", error);
416+
});
417+
396418
// Create a moderate backpressure stream
397419
let processedChunks = 0;
420+
let outputStart = 0;
398421
const outputStream = new stream.Writable({
399422
highWaterMark: 1,
400423
write(chunk, encoding, callback) {
424+
if (processedChunks === 0) {
425+
outputStart = Date.now();
426+
console.log("Output stream started processing chunks");
427+
}
428+
401429
processedChunks++;
402430

403-
// Simulate occasional slow processing with minimal delays
404-
if (processedChunks % 1000 === 0) {
405-
setTimeout(() => {
406-
callback();
407-
}, 1); // 1ms delay
408-
} else {
431+
// Log progress periodically
432+
if (processedChunks % 10000 === 0) {
433+
const elapsed = Date.now() - outputStart;
434+
console.log(
435+
`Output progress: ${processedChunks} chunks processed in ${elapsed}ms`
436+
);
437+
}
438+
439+
// Use synchronous processing instead of setTimeout to avoid timing issues in CI
440+
try {
441+
// Simulate some minimal processing without async delay
442+
if (processedChunks % 1000 === 0) {
443+
// Just add a tiny synchronous delay instead of async setTimeout
444+
const start = Date.now();
445+
while (Date.now() - start < 0.1) {
446+
// Busy wait for 0.1ms
447+
}
448+
}
409449
callback();
450+
} catch (err) {
451+
callback(err);
410452
}
411453
}
412454
});
413455

456+
// Add error handler to output stream
457+
outputStream.on("error", error => {
458+
console.error("Output stream error:", error);
459+
});
460+
461+
outputStream.on("finish", () => {
462+
const elapsed = Date.now() - outputStart;
463+
console.log(
464+
`Output stream finished: ${processedChunks} total chunks in ${elapsed}ms`
465+
);
466+
});
467+
414468
// Use pipeline for proper backpressure handling
415-
await stream.promises.pipeline(data, jsonTransform, outputStream);
469+
console.log("Starting pipeline...");
470+
const pipelineStart = Date.now();
471+
472+
// Create the main pipeline promise
473+
const pipelinePromise = stream.promises.pipeline(
474+
data,
475+
jsonTransform,
476+
outputStream
477+
);
478+
479+
try {
480+
// Execute the pipeline
481+
await pipelinePromise;
482+
const pipelineEnd = Date.now();
483+
console.log(`Pipeline completed in ${pipelineEnd - pipelineStart}ms`);
484+
} catch (error) {
485+
console.error("Pipeline error:", error);
486+
throw error;
487+
}
416488

417489
// Verify everything worked correctly
490+
console.log(
491+
`Final verification: rowCount=${rowCount}, processedChunks=${processedChunks}`
492+
);
418493
expect(rowCount).toBe(seriesNum);
419494
expect(processedChunks).toBeGreaterThan(0);
420495

421496
// Memory usage should remain reasonable with proper streaming
422497
const memoryGrowth =
423498
(maxMemoryUsed - initialMemory.heapUsed) / (1024 * 1024);
424-
expect(memoryGrowth).toBeLessThan(120); // Allow reasonable memory for complex data types with various field types
499+
expect(memoryGrowth).toBeLessThan(150); // Allow reasonable memory for complex data types with various field types
425500

501+
const totalElapsed = Date.now() - startTime;
426502
console.log(
427-
`Data types streaming test: processed ${rowCount} rows with various data types, ` +
503+
`Data types streaming test completed: processed ${rowCount} rows with various data types, ` +
428504
`memory growth: ${memoryGrowth.toFixed(
429505
2
430-
)} MB, processed chunks: ${processedChunks}`
506+
)} MB, processed chunks: ${processedChunks}, total time: ${totalElapsed}ms`
431507
);
432508
});
433509
});

0 commit comments

Comments
 (0)