Skip to content

Commit 9173f55

Browse files
committed
verify no performance degradation
1 parent db40625 commit 9173f55

File tree

2 files changed

+330
-1
lines changed

2 files changed

+330
-1
lines changed

src/statement/stream/serverSideStream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ export class ServerSideStream extends Readable {
1616
private finished = false;
1717
private processingData = false;
1818
private readlineInterfacePaused = false;
19-
private readonly maxPendingRows = 10; // Limit pending rows to prevent memory buildup
19+
private readonly maxPendingRows = 5; // Limit pending rows to prevent memory buildup
2020

2121
constructor(
2222
private readonly response: Response,
Lines changed: 329 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,329 @@
1+
import { Firebolt } from "../../../src/index";
2+
import * as stream from "stream";
3+
4+
const connectionParams = {
5+
auth: {
6+
client_id: process.env.FIREBOLT_CLIENT_ID as string,
7+
client_secret: process.env.FIREBOLT_CLIENT_SECRET as string
8+
},
9+
account: process.env.FIREBOLT_ACCOUNT as string,
10+
database: process.env.FIREBOLT_DATABASE as string,
11+
engineName: process.env.FIREBOLT_ENGINE_NAME as string
12+
};
13+
14+
jest.setTimeout(250000);
15+
16+
describe("performance comparison", () => {
17+
const generateLargeResultQuery = (rows: number) => `
18+
SELECT
19+
i as id,
20+
'user_' || i::string as username,
21+
'email_' || i::string || '@example.com' as email,
22+
CASE WHEN i % 2 = 0 THEN true ELSE false END as status,
23+
CAST('100000000000000000' as BIGINT) as big_number,
24+
'2024-01-01'::date + (i % 365) as created_date,
25+
RANDOM() * 1000 as score,
26+
'Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.' as description
27+
FROM generate_series(1, ${rows}) as i
28+
`;
29+
30+
it("compare normal vs streaming execution performance", async () => {
31+
const firebolt = Firebolt({
32+
apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string
33+
});
34+
35+
const connection = await firebolt.connect(connectionParams);
36+
const seriesNum = 100000;
37+
const query = generateLargeResultQuery(seriesNum);
38+
39+
console.log(`\nTesting performance with ${seriesNum} rows...`);
40+
41+
// Test normal execution (fetchResult)
42+
const normalStartTime = process.hrtime.bigint();
43+
44+
const statement = await connection.execute(query, {
45+
response: {
46+
normalizeData: true,
47+
bigNumberAsString: false
48+
}
49+
});
50+
51+
const { data: normalData } = await statement.fetchResult();
52+
const normalEndTime = process.hrtime.bigint();
53+
54+
const normalExecutionTime = Number(normalEndTime - normalStartTime) / 1e6; // Convert to milliseconds
55+
56+
console.log(`Normal execution: ${normalExecutionTime.toFixed(2)}ms`);
57+
expect(normalData.length).toBe(seriesNum);
58+
59+
// Test streaming execution (streamResult)
60+
const streamStartTime = process.hrtime.bigint();
61+
62+
const streamStatement = await connection.executeStream(query, {
63+
response: {
64+
normalizeData: true,
65+
bigNumberAsString: false
66+
}
67+
});
68+
69+
const { data: streamData } = await streamStatement.streamResult();
70+
71+
let streamRowCount = 0;
72+
73+
// Process streaming data
74+
const processStreamData = new Promise<void>((resolve, reject) => {
75+
streamData.on("data", () => {
76+
streamRowCount++;
77+
});
78+
79+
streamData.on("end", () => {
80+
resolve();
81+
});
82+
83+
streamData.on("error", error => {
84+
reject(error);
85+
});
86+
});
87+
88+
await processStreamData;
89+
const streamEndTime = process.hrtime.bigint();
90+
91+
const streamExecutionTime = Number(streamEndTime - streamStartTime) / 1e6; // Convert to milliseconds
92+
93+
console.log(`Stream execution: ${streamExecutionTime.toFixed(2)}ms`);
94+
expect(streamRowCount).toBe(seriesNum);
95+
96+
// Performance analysis
97+
const timeDifference = normalExecutionTime - streamExecutionTime;
98+
99+
console.log(`\nPerformance Analysis:`);
100+
console.log(
101+
`Time difference: ${timeDifference.toFixed(2)}ms (${
102+
timeDifference > 0 ? "streaming faster" : "normal faster"
103+
})`
104+
);
105+
console.log(
106+
`Speed ratio: ${(normalExecutionTime / streamExecutionTime).toFixed(2)}x`
107+
);
108+
109+
// Verify both methods processed the same number of rows
110+
expect(streamRowCount).toBe(normalData.length); // Same number of rows processed
111+
112+
// Ensure streaming is not more than 10% slower than normal execution
113+
const maxAllowedStreamTime = normalExecutionTime * 1.1; // 10% slower threshold
114+
expect(streamExecutionTime).toBeLessThanOrEqual(maxAllowedStreamTime);
115+
});
116+
117+
it("compare streaming vs normal with pipeline processing", async () => {
118+
const firebolt = Firebolt({
119+
apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string
120+
});
121+
122+
const connection = await firebolt.connect(connectionParams);
123+
const seriesNum = 50000; // Smaller dataset for pipeline comparison
124+
const query = generateLargeResultQuery(seriesNum);
125+
126+
console.log(`\nTesting pipeline performance with ${seriesNum} rows...`);
127+
128+
// Test normal execution with manual processing
129+
const normalStartTime = process.hrtime.bigint();
130+
131+
const statement = await connection.execute(query, {
132+
response: {
133+
normalizeData: true,
134+
bigNumberAsString: false
135+
}
136+
});
137+
138+
const { data: normalData } = await statement.fetchResult();
139+
140+
// Simulate processing (similar to what streaming pipeline would do)
141+
let processedNormalCount = 0;
142+
const processedNormalData: string[] = [];
143+
for (const row of normalData) {
144+
processedNormalCount++;
145+
processedNormalData.push(JSON.stringify(row));
146+
}
147+
148+
const normalEndTime = process.hrtime.bigint();
149+
150+
const normalExecutionTime = Number(normalEndTime - normalStartTime) / 1e6;
151+
152+
console.log(`Normal with processing: ${normalExecutionTime.toFixed(2)}ms`);
153+
154+
// Test streaming with pipeline processing
155+
const streamStartTime = process.hrtime.bigint();
156+
157+
const streamStatement = await connection.executeStream(query, {
158+
response: {
159+
normalizeData: true,
160+
bigNumberAsString: false
161+
}
162+
});
163+
164+
const { data: streamData } = await streamStatement.streamResult();
165+
166+
let processedStreamCount = 0;
167+
168+
// Create processing pipeline
169+
const jsonTransform = new stream.Transform({
170+
objectMode: true,
171+
transform(
172+
row: unknown,
173+
encoding: BufferEncoding,
174+
callback: (error?: Error | null) => void
175+
) {
176+
try {
177+
processedStreamCount++;
178+
179+
const json = JSON.stringify(row);
180+
this.push(json);
181+
callback();
182+
} catch (err) {
183+
callback(err as Error);
184+
}
185+
}
186+
});
187+
188+
const processedStreamData: string[] = [];
189+
const collectStream = new stream.Writable({
190+
objectMode: true,
191+
write(chunk: string, encoding, callback) {
192+
processedStreamData.push(chunk);
193+
callback();
194+
}
195+
});
196+
197+
// Use pipeline for proper backpressure handling
198+
await stream.promises.pipeline(streamData, jsonTransform, collectStream);
199+
200+
const streamEndTime = process.hrtime.bigint();
201+
202+
const streamExecutionTime = Number(streamEndTime - streamStartTime) / 1e6;
203+
204+
console.log(`Stream with pipeline: ${streamExecutionTime.toFixed(2)}ms`);
205+
206+
// Verify results
207+
expect(processedStreamCount).toBe(seriesNum);
208+
expect(processedNormalCount).toBe(seriesNum);
209+
expect(processedStreamData.length).toBe(processedNormalData.length);
210+
211+
// Performance analysis
212+
const timeDifference = normalExecutionTime - streamExecutionTime;
213+
214+
console.log(`\nPipeline Performance Analysis:`);
215+
console.log(
216+
`Time difference: ${timeDifference.toFixed(2)}ms (${
217+
timeDifference > 0 ? "streaming faster" : "normal faster"
218+
})`
219+
);
220+
console.log(
221+
`Speed ratio: ${(normalExecutionTime / streamExecutionTime).toFixed(2)}x`
222+
);
223+
console.log(
224+
`Processing efficiency: ${(
225+
processedStreamCount / processedNormalCount
226+
).toFixed(2)}x`
227+
);
228+
229+
// Verify results
230+
expect(processedStreamCount).toBe(seriesNum);
231+
expect(processedNormalCount).toBe(seriesNum);
232+
expect(processedStreamData.length).toBe(processedNormalData.length);
233+
234+
// Ensure streaming pipeline is not more than 10% slower than normal execution
235+
const maxAllowedStreamTime = normalExecutionTime * 1.1; // 10% slower threshold
236+
expect(streamExecutionTime).toBeLessThanOrEqual(maxAllowedStreamTime);
237+
});
238+
239+
it("execution time comparison with different dataset sizes", async () => {
240+
const firebolt = Firebolt({
241+
apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string
242+
});
243+
244+
const connection = await firebolt.connect(connectionParams);
245+
const testSizes = [10000, 25000, 50000]; // Different dataset sizes
246+
247+
const results: Array<{
248+
size: number;
249+
normalTime: number;
250+
streamTime: number;
251+
}> = [];
252+
253+
for (const size of testSizes) {
254+
console.log(`\nTesting dataset size: ${size} rows`);
255+
const query = generateLargeResultQuery(size);
256+
257+
// Test normal execution
258+
const normalStartTime = process.hrtime.bigint();
259+
260+
const statement = await connection.execute(query, {
261+
response: { normalizeData: true, bigNumberAsString: false }
262+
});
263+
264+
const { data: normalData } = await statement.fetchResult();
265+
const normalEndTime = process.hrtime.bigint();
266+
267+
const normalTime = Number(normalEndTime - normalStartTime) / 1e6;
268+
269+
// Test streaming execution
270+
const streamStartTime = process.hrtime.bigint();
271+
272+
const streamStatement = await connection.executeStream(query, {
273+
response: { normalizeData: true, bigNumberAsString: false }
274+
});
275+
276+
const { data: streamData } = await streamStatement.streamResult();
277+
278+
let streamRowCount = 0;
279+
280+
await new Promise<void>((resolve, reject) => {
281+
streamData.on("data", () => {
282+
streamRowCount++;
283+
});
284+
285+
streamData.on("end", resolve);
286+
streamData.on("error", reject);
287+
});
288+
289+
const streamEndTime = process.hrtime.bigint();
290+
const streamTime = Number(streamEndTime - streamStartTime) / 1e6;
291+
292+
results.push({
293+
size,
294+
normalTime,
295+
streamTime
296+
});
297+
298+
console.log(
299+
`Size ${size}: Normal(${normalTime.toFixed(
300+
2
301+
)}ms) vs Stream(${streamTime.toFixed(2)}ms)`
302+
);
303+
304+
expect(normalData.length).toBe(size);
305+
expect(streamRowCount).toBe(size);
306+
}
307+
308+
// Analysis across different sizes
309+
console.log(`\nExecution Time Scaling Analysis:`);
310+
for (const result of results) {
311+
const timeRatio = result.streamTime / result.normalTime;
312+
const timeDifference = result.normalTime - result.streamTime;
313+
314+
console.log(
315+
`Size ${result.size}: Time difference ${timeDifference.toFixed(
316+
2
317+
)}ms, Speed ratio ${timeRatio.toFixed(2)}x`
318+
);
319+
320+
// Both methods should complete successfully
321+
expect(result.normalTime).toBeGreaterThan(0);
322+
expect(result.streamTime).toBeGreaterThan(0);
323+
324+
// Ensure streaming is not more than 10% slower than normal execution for each dataset size
325+
const maxAllowedStreamTime = result.normalTime * 1.1; // 10% slower threshold
326+
expect(result.streamTime).toBeLessThanOrEqual(maxAllowedStreamTime);
327+
}
328+
});
329+
});

0 commit comments

Comments
 (0)