Skip to content

Commit cc9ed5d

Browse files
committed
file-based backpressure
1 parent 010bdbf commit cc9ed5d

File tree

1 file changed

+88
-11
lines changed

1 file changed

+88
-11
lines changed

test/integration/v2/stream.test.ts

Lines changed: 88 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ import exp from "node:constants";
22
import { Firebolt } from "../../../src";
33
import stream, { TransformCallback } from "node:stream";
44
import BigNumber from "bignumber.js";
5+
import fs from "node:fs";
6+
import path from "node:path";
7+
import os from "node:os";
58

69
const connectionParams = {
710
auth: {
@@ -397,21 +400,71 @@ describe("streams", () => {
397400
}
398401
});
399402

400-
// Create a moderate backpressure stream
403+
// Create a batch processing stream that writes to a temporary file
401404
let processedChunks = 0;
405+
const batchSize = 100;
406+
let batchBuffer: string[] = [];
407+
408+
// Create a temporary file for writing
409+
const tempDir = os.tmpdir();
410+
const tempFilePath = path.join(
411+
tempDir,
412+
`firebolt-stream-test-${Date.now()}.jsonl`
413+
);
414+
const fileWriteStream = fs.createWriteStream(tempFilePath, { flags: "w" });
415+
402416
const outputStream = new stream.Writable({
403-
highWaterMark: 1,
404-
write(chunk, encoding, callback) {
417+
highWaterMark: 1, // Reasonable buffer size
418+
write(chunk: Buffer, encoding, callback) {
405419
processedChunks++;
420+
const chunkStr = chunk.toString();
421+
batchBuffer.push(chunkStr);
422+
423+
// Process in batches to create natural backpressure patterns
424+
if (batchBuffer.length >= batchSize) {
425+
// Process the batch synchronously (simulate some work)
426+
const batchData = batchBuffer.join("");
427+
const lines = batchData.split("\n").filter(line => line.trim());
428+
429+
// Write valid JSON lines to the file
430+
for (const line of lines) {
431+
try {
432+
JSON.parse(line); // Verify it's valid JSON
433+
fileWriteStream.write(line + "\n");
434+
} catch (e) {
435+
// Skip invalid JSON lines
436+
}
437+
}
406438

407-
// Simulate occasional slow processing with event loop yielding
408-
if (processedChunks % 1000 === 0) {
409-
setImmediate(() => {
410-
callback();
411-
});
412-
} else {
413-
callback();
439+
// Clear the batch
440+
batchBuffer = [];
441+
}
442+
443+
callback();
444+
},
445+
446+
final(callback) {
447+
// Process any remaining items in the final batch
448+
if (batchBuffer.length > 0) {
449+
const batchData = batchBuffer.join("");
450+
const lines = batchData.split("\n").filter(line => line.trim());
451+
452+
// Write remaining valid JSON lines to the file
453+
for (const line of lines) {
454+
try {
455+
JSON.parse(line); // Verify it's valid JSON
456+
fileWriteStream.write(line + "\n");
457+
} catch (e) {
458+
// Skip invalid JSON lines
459+
}
460+
}
461+
batchBuffer = [];
414462
}
463+
464+
// Close the file stream
465+
fileWriteStream.end(() => {
466+
callback();
467+
});
415468
}
416469
});
417470

@@ -422,6 +475,26 @@ describe("streams", () => {
422475
expect(rowCount).toBe(seriesNum);
423476
expect(processedChunks).toBeGreaterThan(0);
424477

478+
// Verify the file was created and has content
479+
expect(fs.existsSync(tempFilePath)).toBe(true);
480+
const fileStats = fs.statSync(tempFilePath);
481+
expect(fileStats.size).toBeGreaterThan(0);
482+
483+
// Read a few lines from the file to verify JSON format
484+
const fileContent = fs.readFileSync(tempFilePath, "utf-8");
485+
const lines = fileContent.split("\n").filter(line => line.trim());
486+
expect(lines.length).toBeGreaterThan(0);
487+
488+
// Verify first line is valid JSON
489+
if (lines.length > 0) {
490+
const firstRow = JSON.parse(lines[0]);
491+
expect(firstRow).toHaveProperty("id");
492+
expect(firstRow).toHaveProperty("username");
493+
}
494+
495+
// Clean up the temporary file
496+
fs.unlinkSync(tempFilePath);
497+
425498
// Memory usage should remain reasonable with proper streaming
426499
const memoryGrowth =
427500
(maxMemoryUsed - initialMemory.heapUsed) / (1024 * 1024);
@@ -431,7 +504,11 @@ describe("streams", () => {
431504
`Data types streaming test: processed ${rowCount} rows with various data types, ` +
432505
`memory growth: ${memoryGrowth.toFixed(
433506
2
434-
)} MB, processed chunks: ${processedChunks}`
507+
)} MB, processed chunks: ${processedChunks}, file size: ${(
508+
fileStats.size /
509+
1024 /
510+
1024
511+
).toFixed(2)} MB`
435512
);
436513
});
437514
});

0 commit comments

Comments
 (0)