Skip to content

Commit 50cc0bd

Browse files
committed
fix: reducers
1 parent 75477a1 commit 50cc0bd

File tree

2 files changed

+170
-153
lines changed

2 files changed

+170
-153
lines changed

src/store/reducers/query/query.ts

Lines changed: 10 additions & 153 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,19 @@ import {settingsManager} from '../../../services/settings';
77
import type {ColumnType} from '../../../types/api/query';
88
import {TracingLevelNumber} from '../../../types/api/query';
99
import type {QueryAction, QueryRequestParams, QuerySettings} from '../../../types/store/query';
10-
import type {
11-
QueryResponseChunk,
12-
SessionChunk,
13-
StreamDataChunk,
14-
} from '../../../types/store/streaming';
10+
import type {StreamDataChunk} from '../../../types/store/streaming';
1511
import {QUERIES_HISTORY_KEY} from '../../../utils/constants';
16-
import {isQueryErrorResponse, parseResult} from '../../../utils/query';
12+
import {isQueryErrorResponse} from '../../../utils/query';
1713
import {isNumeric} from '../../../utils/utils';
1814
import type {RootState} from '../../defaultStore';
1915
import {api} from '../api';
2016

21-
import {preparePlanData} from './preparePlanData';
2217
import {prepareQueryData} from './prepareQueryData';
18+
import {
19+
addStreamingChunks as addStreamingChunksReducer,
20+
setStreamQueryResponse as setStreamQueryResponseReducer,
21+
setStreamSession as setStreamSessionReducer,
22+
} from './streamingReducers';
2323
import type {QueryResult, QueryState} from './types';
2424
import {getActionAndSyntaxFromQueryMode, getQueryInHistory} from './utils';
2525

@@ -128,152 +128,9 @@ const slice = createSlice({
128128
setQueryHistoryFilter: (state, action: PayloadAction<string>) => {
129129
state.history.filter = action.payload;
130130
},
131-
setStreamSession: (state, action: PayloadAction<SessionChunk>) => {
132-
if (!state.result) {
133-
return;
134-
}
135-
136-
if (!state.result.data) {
137-
state.result.data = prepareQueryData(null);
138-
}
139-
140-
const chunk = action.payload;
141-
state.result.isLoading = true;
142-
state.result.queryId = chunk.meta.query_id;
143-
state.result.data.traceId = chunk.meta.trace_id;
144-
},
145-
addStreamingChunks: (state, action: PayloadAction<StreamDataChunk[]>) => {
146-
if (!state.result) {
147-
return;
148-
}
149-
150-
if (!state.result.data) {
151-
state.result.data = prepareQueryData(null);
152-
}
153-
154-
// Initialize speed metrics if not present
155-
if (!state.result.speedMetrics) {
156-
state.result.speedMetrics = {
157-
rowsPerSecond: 0,
158-
lastUpdateTime: Date.now(),
159-
recentChunks: [],
160-
};
161-
}
162-
163-
const currentTime = Date.now();
164-
let totalNewRows = 0;
165-
166-
const mergedStreamDataChunks = new Map<number, StreamDataChunk>();
167-
for (const chunk of action.payload) {
168-
const currentMergedChunk = mergedStreamDataChunks.get(chunk.meta.result_index);
169-
const chunkRowCount = (chunk.result.rows || []).length;
170-
totalNewRows += chunkRowCount;
171-
172-
if (currentMergedChunk) {
173-
if (!currentMergedChunk.result.rows) {
174-
currentMergedChunk.result.rows = [];
175-
}
176-
for (const row of chunk.result.rows || []) {
177-
currentMergedChunk.result.rows.push(row);
178-
}
179-
} else {
180-
mergedStreamDataChunks.set(chunk.meta.result_index, chunk);
181-
}
182-
}
183-
184-
// Update speed metrics
185-
const metrics = state.result.speedMetrics;
186-
metrics.recentChunks.push({
187-
timestamp: currentTime,
188-
rowCount: totalNewRows,
189-
});
190-
191-
// Keep only chunks from the last 5 seconds
192-
const WINDOW_SIZE = 5000; // 5 seconds in milliseconds
193-
metrics.recentChunks = metrics.recentChunks.filter(
194-
(chunk) => currentTime - chunk.timestamp <= WINDOW_SIZE,
195-
);
196-
197-
// Calculate moving average
198-
if (metrics.recentChunks.length > 0) {
199-
const oldestChunkTime = metrics.recentChunks[0].timestamp;
200-
const timeWindow = (currentTime - oldestChunkTime) / 1000; // Convert to seconds
201-
const totalRows = metrics.recentChunks.reduce(
202-
(sum, chunk) => sum + chunk.rowCount,
203-
0,
204-
);
205-
metrics.rowsPerSecond = timeWindow > 0 ? totalRows / timeWindow : 0;
206-
}
207-
208-
metrics.lastUpdateTime = currentTime;
209-
210-
if (!state.result.data.resultSets) {
211-
state.result.data.resultSets = [];
212-
}
213-
214-
for (const [resultIndex, chunk] of mergedStreamDataChunks.entries()) {
215-
const {columns, rows} = chunk.result;
216-
217-
if (!state.result.data.resultSets[resultIndex]) {
218-
state.result.data.resultSets[resultIndex] = {
219-
columns: [],
220-
result: [],
221-
};
222-
}
223-
224-
if (columns && !state.result.data.resultSets[resultIndex].columns?.length) {
225-
state.result.data.resultSets[resultIndex].columns?.push(INDEX_COLUMN);
226-
for (const column of columns) {
227-
state.result.data.resultSets[resultIndex].columns?.push(column);
228-
}
229-
}
230-
231-
const indexedRows = rows || [];
232-
const startIndex =
233-
state.result?.data?.resultSets?.[resultIndex].result?.length || 1;
234-
235-
indexedRows.forEach((row, index) => {
236-
row.unshift(startIndex + index);
237-
});
238-
239-
const formattedRows = parseResult(
240-
indexedRows,
241-
state.result.data.resultSets[resultIndex].columns || [],
242-
);
243-
244-
for (const row of formattedRows) {
245-
state.result.data.resultSets[resultIndex].result?.push(row);
246-
}
247-
}
248-
},
249-
setStreamQueryResponse: (state, action: PayloadAction<QueryResponseChunk>) => {
250-
if (!state.result) {
251-
return;
252-
}
253-
254-
if (!state.result.data) {
255-
state.result.data = prepareQueryData(null);
256-
}
257-
258-
state.result.isLoading = false;
259-
260-
const chunk = action.payload;
261-
if ('error' in chunk) {
262-
state.result.error = chunk;
263-
} else if ('plan' in chunk) {
264-
if (!state.result.data) {
265-
state.result.data = prepareQueryData(null);
266-
}
267-
268-
const {plan: rawPlan, stats} = chunk;
269-
const {simplifiedPlan, ...planData} = preparePlanData(rawPlan, stats);
270-
state.result.data.preparedPlan =
271-
Object.keys(planData).length > 0 ? planData : undefined;
272-
state.result.data.simplifiedPlan = simplifiedPlan;
273-
state.result.data.plan = chunk.plan;
274-
state.result.data.stats = chunk.stats;
275-
}
276-
},
131+
setStreamSession: setStreamSessionReducer,
132+
addStreamingChunks: addStreamingChunksReducer,
133+
setStreamQueryResponse: setStreamQueryResponseReducer,
277134
},
278135
selectors: {
279136
selectQueriesHistoryFilter: (state) => state.history.filter || '',
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
import type {PayloadAction} from '@reduxjs/toolkit';
2+
3+
import type {
4+
QueryResponseChunk,
5+
SessionChunk,
6+
StreamDataChunk,
7+
} from '../../../types/store/streaming';
8+
import {parseResult} from '../../../utils/query';
9+
10+
import {preparePlanData} from './preparePlanData';
11+
import {prepareQueryData} from './prepareQueryData';
12+
import {INDEX_COLUMN} from './query';
13+
import type {QueryState} from './types';
14+
15+
export const setStreamSession = (state: QueryState, action: PayloadAction<SessionChunk>) => {
16+
if (!state.result) {
17+
return;
18+
}
19+
20+
if (!state.result.data) {
21+
state.result.data = prepareQueryData(null);
22+
}
23+
24+
const chunk = action.payload;
25+
state.result.isLoading = true;
26+
state.result.queryId = chunk.meta.query_id;
27+
state.result.data.traceId = chunk.meta.trace_id;
28+
};
29+
30+
export const setStreamQueryResponse = (
31+
state: QueryState,
32+
action: PayloadAction<QueryResponseChunk>,
33+
) => {
34+
if (!state.result) {
35+
return;
36+
}
37+
38+
if (!state.result.data) {
39+
state.result.data = prepareQueryData(null);
40+
}
41+
42+
state.result.isLoading = false;
43+
44+
const chunk = action.payload;
45+
if ('error' in chunk) {
46+
state.result.error = chunk;
47+
} else if ('plan' in chunk) {
48+
if (!state.result.data) {
49+
state.result.data = prepareQueryData(null);
50+
}
51+
52+
const {plan: rawPlan, stats} = chunk;
53+
const {simplifiedPlan, ...planData} = preparePlanData(rawPlan, stats);
54+
state.result.data.preparedPlan = Object.keys(planData).length > 0 ? planData : undefined;
55+
state.result.data.simplifiedPlan = simplifiedPlan;
56+
state.result.data.plan = chunk.plan;
57+
state.result.data.stats = chunk.stats;
58+
}
59+
};
60+
61+
export const addStreamingChunks = (state: QueryState, action: PayloadAction<StreamDataChunk[]>) => {
62+
if (!state.result) {
63+
return;
64+
}
65+
66+
if (!state.result.data) {
67+
state.result.data = prepareQueryData(null);
68+
}
69+
70+
// Initialize speed metrics if not present
71+
if (!state.result.speedMetrics) {
72+
state.result.speedMetrics = {
73+
rowsPerSecond: 0,
74+
lastUpdateTime: Date.now(),
75+
recentChunks: [],
76+
};
77+
}
78+
79+
const currentTime = Date.now();
80+
let totalNewRows = 0;
81+
82+
const mergedStreamDataChunks = new Map<number, StreamDataChunk>();
83+
for (const chunk of action.payload) {
84+
const currentMergedChunk = mergedStreamDataChunks.get(chunk.meta.result_index);
85+
const chunkRowCount = (chunk.result.rows || []).length;
86+
totalNewRows += chunkRowCount;
87+
88+
if (currentMergedChunk) {
89+
if (!currentMergedChunk.result.rows) {
90+
currentMergedChunk.result.rows = [];
91+
}
92+
for (const row of chunk.result.rows || []) {
93+
currentMergedChunk.result.rows.push(row);
94+
}
95+
} else {
96+
mergedStreamDataChunks.set(chunk.meta.result_index, chunk);
97+
}
98+
}
99+
100+
// Update speed metrics
101+
const metrics = state.result.speedMetrics;
102+
metrics.recentChunks.push({
103+
timestamp: currentTime,
104+
rowCount: totalNewRows,
105+
});
106+
107+
// Keep only chunks from the last 5 seconds
108+
const WINDOW_SIZE = 5000; // 5 seconds in milliseconds
109+
metrics.recentChunks = metrics.recentChunks.filter(
110+
(chunk) => currentTime - chunk.timestamp <= WINDOW_SIZE,
111+
);
112+
113+
// Calculate moving average
114+
if (metrics.recentChunks.length > 0) {
115+
const oldestChunkTime = metrics.recentChunks[0].timestamp;
116+
const timeWindow = (currentTime - oldestChunkTime) / 1000; // Convert to seconds
117+
const totalRows = metrics.recentChunks.reduce((sum, chunk) => sum + chunk.rowCount, 0);
118+
metrics.rowsPerSecond = timeWindow > 0 ? totalRows / timeWindow : 0;
119+
}
120+
121+
metrics.lastUpdateTime = currentTime;
122+
123+
if (!state.result.data.resultSets) {
124+
state.result.data.resultSets = [];
125+
}
126+
127+
for (const [resultIndex, chunk] of mergedStreamDataChunks.entries()) {
128+
const {columns, rows} = chunk.result;
129+
130+
if (!state.result.data.resultSets[resultIndex]) {
131+
state.result.data.resultSets[resultIndex] = {
132+
columns: [],
133+
result: [],
134+
};
135+
}
136+
137+
if (columns && !state.result.data.resultSets[resultIndex].columns?.length) {
138+
state.result.data.resultSets[resultIndex].columns?.push(INDEX_COLUMN);
139+
for (const column of columns) {
140+
state.result.data.resultSets[resultIndex].columns?.push(column);
141+
}
142+
}
143+
144+
const indexedRows = rows || [];
145+
const startIndex = state.result?.data?.resultSets?.[resultIndex].result?.length || 1;
146+
147+
indexedRows.forEach((row, index) => {
148+
row.unshift(startIndex + index);
149+
});
150+
151+
const formattedRows = parseResult(
152+
indexedRows,
153+
state.result.data.resultSets[resultIndex].columns || [],
154+
);
155+
156+
for (const row of formattedRows) {
157+
state.result.data.resultSets[resultIndex].result?.push(row);
158+
}
159+
}
160+
};

0 commit comments

Comments
 (0)