Skip to content

Commit 590dccd

Browse files
committed
fix: speed up
1 parent 327fe22 commit 590dccd

File tree

1 file changed

+58
-74
lines changed

1 file changed

+58
-74
lines changed

src/store/reducers/query/streamingReducers.ts

Lines changed: 58 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -58,103 +58,87 @@ export const setStreamQueryResponse = (
5858
}
5959
};
6060

61-
export const addStreamingChunks = (state: QueryState, action: PayloadAction<StreamDataChunk[]>) => {
62-
if (!state.result) {
63-
return;
64-
}
65-
66-
if (!state.result.data) {
67-
state.result.data = prepareQueryData(null);
68-
}
69-
70-
// Initialize speed metrics if not present
71-
if (!state.result.speedMetrics) {
72-
state.result.speedMetrics = {
73-
rowsPerSecond: 0,
74-
lastUpdateTime: Date.now(),
75-
recentChunks: [],
76-
};
77-
}
61+
type SpeedMetrics = NonNullable<NonNullable<QueryState['result']>['speedMetrics']>;
7862

63+
const updateSpeedMetrics = (metrics: SpeedMetrics, totalNewRows: number) => {
7964
const currentTime = Date.now();
80-
let totalNewRows = 0;
81-
82-
const mergedStreamDataChunks = new Map<number, StreamDataChunk>();
83-
for (const chunk of action.payload) {
84-
const currentMergedChunk = mergedStreamDataChunks.get(chunk.meta.result_index);
85-
const chunkRowCount = (chunk.result.rows || []).length;
86-
totalNewRows += chunkRowCount;
87-
88-
if (currentMergedChunk) {
89-
if (!currentMergedChunk.result.rows) {
90-
currentMergedChunk.result.rows = [];
91-
}
92-
for (const row of chunk.result.rows || []) {
93-
currentMergedChunk.result.rows.push(row);
94-
}
95-
} else {
96-
mergedStreamDataChunks.set(chunk.meta.result_index, chunk);
97-
}
98-
}
99-
100-
// Update speed metrics
101-
const metrics = state.result.speedMetrics;
102-
metrics.recentChunks.push({
103-
timestamp: currentTime,
104-
rowCount: totalNewRows,
105-
});
106-
107-
// Keep only chunks from the last 5 seconds
10865
const WINDOW_SIZE = 5000; // 5 seconds in milliseconds
66+
67+
metrics.recentChunks.push({timestamp: currentTime, rowCount: totalNewRows});
10968
metrics.recentChunks = metrics.recentChunks.filter(
11069
(chunk) => currentTime - chunk.timestamp <= WINDOW_SIZE,
11170
);
11271

113-
// Calculate moving average
11472
if (metrics.recentChunks.length > 0) {
11573
const oldestChunkTime = metrics.recentChunks[0].timestamp;
116-
const timeWindow = (currentTime - oldestChunkTime) / 1000; // Convert to seconds
117-
const totalRows = metrics.recentChunks.reduce((sum, chunk) => sum + chunk.rowCount, 0);
74+
const timeWindow = (currentTime - oldestChunkTime) / 1000;
75+
const totalRows = metrics.recentChunks.reduce(
76+
(sum: number, chunk) => sum + chunk.rowCount,
77+
0,
78+
);
11879
metrics.rowsPerSecond = timeWindow > 0 ? totalRows / timeWindow : 0;
11980
}
12081

12182
metrics.lastUpdateTime = currentTime;
83+
};
12284

123-
if (!state.result.data.resultSets) {
124-
state.result.data.resultSets = [];
85+
export const addStreamingChunks = (state: QueryState, action: PayloadAction<StreamDataChunk[]>) => {
86+
if (!state.result) {
87+
return;
12588
}
12689

127-
for (const [resultIndex, chunk] of mergedStreamDataChunks.entries()) {
128-
const {columns, rows} = chunk.result;
90+
state.result.data = state.result.data || prepareQueryData(null);
91+
state.result.speedMetrics = state.result.speedMetrics || {
92+
rowsPerSecond: 0,
93+
lastUpdateTime: Date.now(),
94+
recentChunks: [],
95+
};
96+
state.result.data.resultSets = state.result.data.resultSets || [];
12997

130-
if (!state.result.data.resultSets[resultIndex]) {
131-
state.result.data.resultSets[resultIndex] = {
132-
columns: [],
133-
result: [],
134-
};
135-
}
98+
// Merge chunks by result index
99+
const mergedChunks = action.payload.reduce((acc: Map<number, StreamDataChunk>, chunk) => {
100+
const resultIndex = chunk.meta.result_index;
101+
const currentMergedChunk = acc.get(resultIndex);
136102

137-
if (columns && !state.result.data.resultSets[resultIndex].columns?.length) {
138-
state.result.data.resultSets[resultIndex].columns?.push(INDEX_COLUMN);
139-
for (const column of columns) {
140-
state.result.data.resultSets[resultIndex].columns?.push(column);
141-
}
103+
if (currentMergedChunk) {
104+
currentMergedChunk.result.rows?.push(...(chunk.result.rows || []));
105+
} else {
106+
acc.set(resultIndex, {
107+
...chunk,
108+
result: {...chunk.result, rows: chunk.result.rows || []},
109+
});
142110
}
111+
return acc;
112+
}, new Map<number, StreamDataChunk>());
143113

144-
const indexedRows = rows || [];
145-
const startIndex = state.result?.data?.resultSets?.[resultIndex].result?.length || 1;
114+
const totalNewRows = action.payload.reduce(
115+
(sum: number, chunk) => sum + (chunk.result.rows?.length || 0),
116+
0,
117+
);
146118

147-
indexedRows.forEach((row, index) => {
148-
row.unshift(startIndex + index);
149-
});
119+
if (state.result.speedMetrics) {
120+
updateSpeedMetrics(state.result.speedMetrics, totalNewRows);
121+
}
150122

151-
const formattedRows = parseResult(
152-
indexedRows,
153-
state.result.data.resultSets[resultIndex].columns || [],
154-
);
123+
// Process merged chunks
124+
for (const [resultIndex, chunk] of mergedChunks.entries()) {
125+
const {columns, rows} = chunk.result;
126+
const resultSet = (state.result.data.resultSets[resultIndex] = state.result.data.resultSets[
127+
resultIndex
128+
] || {
129+
columns: [],
130+
result: [],
131+
});
155132

156-
for (const row of formattedRows) {
157-
state.result.data.resultSets[resultIndex].result?.push(row);
133+
if (columns && !resultSet.columns?.length) {
134+
resultSet.columns = [INDEX_COLUMN, ...columns];
158135
}
136+
137+
const startIndex = resultSet.result?.length || 1;
138+
const safeRows = rows || [];
139+
const indexedRows = safeRows.map((row, index) => [startIndex + index, ...row]);
140+
const formattedRows = parseResult(indexedRows, resultSet.columns || []);
141+
142+
resultSet.result = [...(resultSet.result || []), ...formattedRows];
159143
}
160144
};

0 commit comments

Comments
 (0)