Skip to content

Commit d7f51b2

Browse files
committed
fix: performance issues
1 parent f2d6c1c commit d7f51b2

File tree

3 files changed

+67
-104
lines changed

3 files changed

+67
-104
lines changed

src/services/api/streaming.ts

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
import {isQueryResponseChunk, isSessionChunk} from '../../store/reducers/query/utils';
1+
import {isSessionChunk} from '../../store/reducers/query/utils';
22
import type {Actions, TracingLevel} from '../../types/api/query';
33
import type {QuerySyntax, StatisticsMode} from '../../types/store/query';
44
import type {StreamingChunk} from '../../types/store/streaming';
5-
import {StreamDataBatcher} from '../../utils/StreamDataBatcher';
65
import {BINARY_DATA_IN_PLAIN_TEXT_DISPLAY} from '../../utils/constants';
76
import {MultipartStreamParser} from '../parsers/parseMultipartStream';
87
import {settingsManager} from '../settings';
@@ -36,24 +35,15 @@ export class StreamingAPI extends BaseYdbAPI {
3635
);
3736

3837
let traceId: string | undefined = '';
39-
const batcher = new StreamDataBatcher();
4038

4139
const streamParser = new MultipartStreamParser((chunk: StreamingChunk) => {
4240
if (isSessionChunk(chunk)) {
4341
const sessionChunk = chunk;
4442
sessionChunk.meta.trace_id = traceId;
4543
options.onChunk(chunk);
46-
} else if (isQueryResponseChunk(chunk)) {
47-
// Flush any batched chunks before sending response
48-
const batchedChunks = batcher.flush();
49-
batchedChunks.forEach((batchedChunk) => options.onChunk(batchedChunk));
50-
options.onChunk(chunk);
5144
} else {
52-
// For stream data chunks, try to batch them
53-
const batchedChunk = batcher.addChunk(chunk);
54-
if (batchedChunk) {
55-
options.onChunk(batchedChunk);
56-
}
45+
// Send chunks directly without batching
46+
options.onChunk(chunk);
5747
}
5848
});
5949

@@ -100,10 +90,9 @@ export class StreamingAPI extends BaseYdbAPI {
10090
}
10191
streamParser.processBuffer(value);
10292
}
93+
} catch (e) {
94+
console.error(e);
10395
} finally {
104-
// Flush any remaining batched chunks
105-
const remainingChunks = batcher.flush();
106-
remainingChunks.forEach((chunk) => options.onChunk(chunk));
10796
reader.releaseLock();
10897
}
10998
}

src/store/reducers/query/query.ts

Lines changed: 62 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
/* eslint-disable complexity */
12
/* eslint-disable no-param-reassign */
23
import {createSlice} from '@reduxjs/toolkit';
34
import type {PayloadAction} from '@reduxjs/toolkit';
@@ -6,7 +7,7 @@ import {settingsManager} from '../../../services/settings';
67
import type {ColumnType} from '../../../types/api/query';
78
import {TracingLevelNumber} from '../../../types/api/query';
89
import type {QueryAction, QueryRequestParams, QuerySettings} from '../../../types/store/query';
9-
import type {StreamingChunk} from '../../../types/store/streaming';
10+
import type {StreamDataChunk, StreamingChunk} from '../../../types/store/streaming';
1011
import {QUERIES_HISTORY_KEY} from '../../../utils/constants';
1112
import {isQueryErrorResponse, parseResult} from '../../../utils/query';
1213
import {isNumeric} from '../../../utils/utils';
@@ -133,7 +134,7 @@ const slice = createSlice({
133134
setQueryHistoryFilter: (state, action: PayloadAction<string>) => {
134135
state.history.filter = action.payload;
135136
},
136-
addStreamingChunk: (state, action: PayloadAction<StreamingChunk>) => {
137+
addStreamingChunks: (state, action: PayloadAction<StreamingChunk[]>) => {
137138
if (!state.result) {
138139
return;
139140
}
@@ -142,16 +143,37 @@ const slice = createSlice({
142143
state.result.data = prepareQueryData(null);
143144
}
144145

145-
const chunk = action.payload;
146+
let mergedStreamDataChunk: StreamDataChunk | null = null;
147+
for (const chunk of action.payload) {
148+
if (isSessionChunk(chunk)) {
149+
state.result.queryId = chunk.meta.query_id;
150+
state.result.data.traceId = chunk.meta.trace_id;
151+
} else if (isStreamDataChunk(chunk)) {
152+
if (mergedStreamDataChunk) {
153+
mergedStreamDataChunk.result.rows?.push(...(chunk.result.rows || []));
154+
} else {
155+
mergedStreamDataChunk = chunk;
156+
}
157+
} else if (isQueryResponseChunk(chunk)) {
158+
state.result.isLoading = false;
159+
160+
if (state.result.data) {
161+
const {plan: rawPlan, stats} = chunk;
162+
const {simplifiedPlan, ...planData} = preparePlanData(rawPlan, stats);
163+
state.result.data.preparedPlan =
164+
Object.keys(planData).length > 0 ? planData : undefined;
165+
state.result.data.simplifiedPlan = simplifiedPlan;
166+
state.result.data.plan = chunk.plan;
167+
state.result.data.stats = chunk.stats;
168+
}
169+
}
170+
}
146171

147-
if (isSessionChunk(chunk)) {
148-
state.result.queryId = chunk.meta.query_id;
149-
state.result.data.traceId = chunk.meta.trace_id;
150-
} else if (isStreamDataChunk(chunk)) {
172+
if (mergedStreamDataChunk) {
151173
const {
152174
result: {columns, rows},
153175
meta: {result_index: resultIndex},
154-
} = chunk;
176+
} = mergedStreamDataChunk;
155177

156178
if (columns && !state.result.data.resultSets?.[resultIndex]?.columns?.length) {
157179
const columnsWithIndex = [INDEX_COLUMN, ...columns];
@@ -184,23 +206,16 @@ const slice = createSlice({
184206
state.result.data.resultSets[resultIndex].columns,
185207
);
186208

187-
// Update result set with all formatted rows at once
188-
state.result.data.resultSets[resultIndex].result = [
189-
...(state.result.data.resultSets[resultIndex].result || []),
190-
...formattedRows,
191-
];
192-
}
193-
} else if (isQueryResponseChunk(chunk)) {
194-
state.result.isLoading = false;
195-
196-
if (state.result.data) {
197-
const {plan: rawPlan, stats} = chunk;
198-
const {simplifiedPlan, ...planData} = preparePlanData(rawPlan, stats);
199-
state.result.data.preparedPlan =
200-
Object.keys(planData).length > 0 ? planData : undefined;
201-
state.result.data.simplifiedPlan = simplifiedPlan;
202-
state.result.data.plan = chunk.plan;
203-
state.result.data.stats = chunk.stats;
209+
// Update result set by pushing formatted rows directly
210+
if (!state.result.data.resultSets[resultIndex].result) {
211+
state.result.data.resultSets[resultIndex].result = [];
212+
}
213+
214+
const currentLength = state.result.data.resultSets[resultIndex].result.length;
215+
for (let i = formattedRows.length - 1; i >= 0; i--) {
216+
state.result.data.resultSets[resultIndex].result[currentLength + i] =
217+
formattedRows[i];
218+
}
204219
}
205220
}
206221
},
@@ -233,7 +248,7 @@ export const {
233248
goToNextQuery,
234249
setTenantPath,
235250
setQueryHistoryFilter,
236-
addStreamingChunk,
251+
addStreamingChunks,
237252
} = slice.actions;
238253

239254
export const {
@@ -277,6 +292,17 @@ export const queryApi = api.injectEndpoints({
277292
);
278293

279294
try {
295+
let chunkBatch: StreamingChunk[] = [];
296+
let batchTimeout: number | null = null;
297+
298+
const flushBatch = () => {
299+
if (chunkBatch.length > 0) {
300+
dispatch(addStreamingChunks(chunkBatch));
301+
chunkBatch = [];
302+
}
303+
batchTimeout = null;
304+
};
305+
280306
await window.api.streaming.streamQuery(
281307
{
282308
query,
@@ -302,11 +328,20 @@ export const queryApi = api.injectEndpoints({
302328
{
303329
signal,
304330
onChunk: (chunk) => {
305-
dispatch(addStreamingChunk(chunk));
331+
chunkBatch.push(chunk);
332+
if (!batchTimeout) {
333+
batchTimeout = window.requestAnimationFrame(flushBatch);
334+
}
306335
},
307336
},
308337
);
309338

339+
// Flush any remaining chunks
340+
if (batchTimeout) {
341+
window.cancelAnimationFrame(batchTimeout);
342+
flushBatch();
343+
}
344+
310345
return {data: null};
311346
} catch (error) {
312347
dispatch(

src/utils/StreamDataBatcher.ts

Lines changed: 0 additions & 61 deletions
This file was deleted.

0 commit comments

Comments
 (0)