Skip to content

Commit 6d0e1b5

Browse files
committed
refactor
1 parent cc9ed5d commit 6d0e1b5

File tree

2 files changed

+276
-254
lines changed

2 files changed

+276
-254
lines changed
Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
import { Firebolt } from "../../../src";
2+
import stream from "node:stream";
3+
import fs from "node:fs";
4+
import path from "node:path";
5+
import os from "node:os";
6+
import BigNumber from "bignumber.js";
7+
8+
const connectionParams = {
9+
auth: {
10+
client_id: process.env.FIREBOLT_CLIENT_ID as string,
11+
client_secret: process.env.FIREBOLT_CLIENT_SECRET as string
12+
},
13+
account: process.env.FIREBOLT_ACCOUNT as string,
14+
database: process.env.FIREBOLT_DATABASE as string,
15+
engineName: process.env.FIREBOLT_ENGINE_NAME as string
16+
};
17+
18+
jest.setTimeout(350000);
19+
20+
describe("advanced stream tests", () => {
21+
it("stream with different data types and memory management", async () => {
22+
const firebolt = Firebolt({
23+
apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string
24+
});
25+
const connection = await firebolt.connect(connectionParams);
26+
27+
// Generate a query with various data types
28+
const seriesNum = 100000;
29+
const generateLargeResultQuery = (rows: number) => `
30+
SELECT
31+
i as id,
32+
'user_' || i::string as username,
33+
'email_' || i::string || '@example.com' as email,
34+
CASE WHEN i % 2 = 0 THEN true ELSE false END as status,
35+
CAST('100000000000000000' as BIGINT) as big_number,
36+
'2024-01-01'::date + (i % 365) as created_date,
37+
RANDOM() * 1000 as score,
38+
'Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.' as description
39+
FROM generate_series(1, ${rows}) as i
40+
`;
41+
42+
const statement = await connection.executeStream(
43+
generateLargeResultQuery(seriesNum),
44+
{
45+
response: {
46+
normalizeData: true,
47+
bigNumberAsString: false
48+
}
49+
}
50+
);
51+
52+
const { data } = await statement.streamResult();
53+
54+
// Add meta event handler to verify column metadata
55+
data.on("meta", m => {
56+
expect(m).toEqual([
57+
{ name: "id", type: "int" },
58+
{ name: "username", type: "text" },
59+
{ name: "email", type: "text" },
60+
{ name: "status", type: "boolean" },
61+
{ name: "big_number", type: "long" },
62+
{ name: "created_date", type: "date" },
63+
{ name: "score", type: "double" },
64+
{ name: "description", type: "text" }
65+
]);
66+
});
67+
68+
// Buffer pool configuration
69+
const poolSize = 8192; // 8KB
70+
const poolBuffer = Buffer.allocUnsafe(poolSize);
71+
const newlineCode = 0x0a; // '\n' character code
72+
73+
// Track memory usage
74+
const initialMemory = process.memoryUsage();
75+
let maxMemoryUsed = initialMemory.heapUsed;
76+
let rowCount = 0;
77+
78+
// Create a JSON transform stream with minimal allocation
79+
const jsonTransform = new stream.Transform({
80+
objectMode: true,
81+
highWaterMark: 1, // Limit buffering - critical for memory
82+
transform(
83+
row: unknown,
84+
encoding: BufferEncoding,
85+
callback: (error?: Error | null) => void
86+
) {
87+
try {
88+
rowCount++;
89+
90+
if (rowCount % 5000 === 0) {
91+
const currentMemory = process.memoryUsage();
92+
maxMemoryUsed = Math.max(maxMemoryUsed, currentMemory.heapUsed);
93+
if (global.gc) {
94+
console.log(`Invoking GC at row ${rowCount}`);
95+
global.gc();
96+
}
97+
}
98+
99+
// Verify data types are correct for normalized data on first row
100+
if (rowCount === 1) {
101+
const typedRow = row as Record<string, unknown>;
102+
103+
// Verify actual values for first row
104+
expect(typedRow.id).toBe(1);
105+
expect(typedRow.username).toBe("user_1");
106+
expect(typedRow.email).toBe("[email protected]");
107+
expect(typedRow.status).toBe(false); // i=1, 1%2=1, so false
108+
expect(typedRow.big_number).toEqual(
109+
new BigNumber("100000000000000000")
110+
);
111+
expect(typedRow.created_date).toEqual(new Date("2024-01-02")); // 2024-01-01 + 1 day
112+
expect(typedRow.description).toBe(
113+
"Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."
114+
);
115+
}
116+
117+
// Verify data types are correct for normalized data on last row
118+
if (rowCount === seriesNum) {
119+
const typedRow = row as Record<string, unknown>;
120+
121+
// Verify actual values for last row
122+
expect(typedRow.id).toBe(seriesNum);
123+
expect(typedRow.username).toBe(`user_${seriesNum}`);
124+
expect(typedRow.email).toBe(`email_${seriesNum}@example.com`);
125+
expect(typedRow.status).toBe(true); // seriesNum=100000, 100000%2=0, so true
126+
expect(typedRow.big_number).toEqual(
127+
new BigNumber("100000000000000000")
128+
);
129+
expect(typedRow.created_date).toEqual(new Date("2024-12-21")); // 2024-01-01 + (100000 % 365) = 2024-01-01 + 269 days
130+
expect(typedRow.description).toBe(
131+
"Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."
132+
);
133+
}
134+
135+
const json = JSON.stringify(row);
136+
const jsonLen = Buffer.byteLength(json);
137+
const totalLen = jsonLen + 1;
138+
139+
let buffer: Buffer;
140+
if (totalLen <= poolSize) {
141+
// Use pool for small rows - no allocation
142+
poolBuffer.write(json, 0, jsonLen);
143+
poolBuffer[jsonLen] = newlineCode;
144+
buffer = poolBuffer.subarray(0, totalLen);
145+
} else {
146+
// Allocate for large rows
147+
buffer = Buffer.allocUnsafe(totalLen);
148+
buffer.write(json, 0, jsonLen);
149+
buffer[jsonLen] = newlineCode;
150+
}
151+
152+
this.push(buffer);
153+
callback();
154+
} catch (err) {
155+
callback(err as Error);
156+
}
157+
}
158+
});
159+
160+
// Create a batch processing stream that writes to a temporary file
161+
let processedChunks = 0;
162+
const batchSize = 100;
163+
let batchBuffer: string[] = [];
164+
165+
// Create a temporary file for writing
166+
const tempDir = os.tmpdir();
167+
const tempFilePath = path.join(
168+
tempDir,
169+
`firebolt-stream-test-${Date.now()}.jsonl`
170+
);
171+
const fileWriteStream = fs.createWriteStream(tempFilePath, { flags: "w" });
172+
173+
const outputStream = new stream.Writable({
174+
highWaterMark: 1, // Reasonable buffer size
175+
write(chunk: Buffer, encoding, callback) {
176+
processedChunks++;
177+
const chunkStr = chunk.toString();
178+
batchBuffer.push(chunkStr);
179+
180+
// Process in batches to create natural backpressure patterns
181+
if (batchBuffer.length >= batchSize) {
182+
// Process the batch synchronously (simulate some work)
183+
const batchData = batchBuffer.join("");
184+
const lines = batchData.split("\n").filter(line => line.trim());
185+
186+
// Write valid JSON lines to the file
187+
for (const line of lines) {
188+
try {
189+
JSON.parse(line); // Verify it's valid JSON
190+
fileWriteStream.write(line + "\n");
191+
} catch (e) {
192+
// Skip invalid JSON lines
193+
}
194+
}
195+
196+
// Clear the batch
197+
batchBuffer = [];
198+
}
199+
200+
callback();
201+
},
202+
203+
final(callback) {
204+
// Process any remaining items in the final batch
205+
if (batchBuffer.length > 0) {
206+
const batchData = batchBuffer.join("");
207+
const lines = batchData.split("\n").filter(line => line.trim());
208+
209+
// Write remaining valid JSON lines to the file
210+
for (const line of lines) {
211+
try {
212+
JSON.parse(line); // Verify it's valid JSON
213+
fileWriteStream.write(line + "\n");
214+
} catch (e) {
215+
// Skip invalid JSON lines
216+
}
217+
}
218+
batchBuffer = [];
219+
}
220+
221+
// Close the file stream
222+
fileWriteStream.end(() => {
223+
callback();
224+
});
225+
}
226+
});
227+
228+
// Use pipeline for proper backpressure handling
229+
await stream.promises.pipeline(data, jsonTransform, outputStream);
230+
231+
// Verify everything worked correctly
232+
expect(rowCount).toBe(seriesNum);
233+
expect(processedChunks).toBeGreaterThan(0);
234+
235+
// Verify the file was created and has content
236+
expect(fs.existsSync(tempFilePath)).toBe(true);
237+
const fileStats = fs.statSync(tempFilePath);
238+
expect(fileStats.size).toBeGreaterThan(0);
239+
240+
// Read a few lines from the file to verify JSON format
241+
const fileContent = fs.readFileSync(tempFilePath, "utf-8");
242+
const lines = fileContent.split("\n").filter(line => line.trim());
243+
expect(lines.length).toBeGreaterThan(0);
244+
245+
// Verify first line is valid JSON
246+
if (lines.length > 0) {
247+
const firstRow = JSON.parse(lines[0]);
248+
expect(firstRow).toHaveProperty("id");
249+
expect(firstRow).toHaveProperty("username");
250+
}
251+
252+
// Clean up the temporary file
253+
fs.unlinkSync(tempFilePath);
254+
255+
// Memory usage should remain reasonable with proper streaming
256+
const memoryGrowth =
257+
(maxMemoryUsed - initialMemory.heapUsed) / (1024 * 1024);
258+
259+
if (global.gc) {
260+
expect(memoryGrowth).toBeLessThan(20);
261+
} else {
262+
expect(memoryGrowth).toBeLessThan(100);
263+
}
264+
265+
console.log(
266+
`Data types streaming test: processed ${rowCount} rows with various data types, ` +
267+
`memory growth: ${memoryGrowth.toFixed(
268+
2
269+
)} MB, processed chunks: ${processedChunks}, file size: ${(
270+
fileStats.size /
271+
1024 /
272+
1024
273+
).toFixed(2)} MB`
274+
);
275+
});
276+
});

0 commit comments

Comments
 (0)