@@ -142,7 +142,7 @@ const slice = createSlice({
142142 state . result . queryId = chunk . meta . query_id ;
143143 state . result . data . traceId = chunk . meta . trace_id ;
144144 } ,
145- addStreamingChunks : ( state , action : PayloadAction < StreamDataChunk [ ] > ) => {
145+ addStreamingChunk : ( state , action : PayloadAction < StreamDataChunk > ) => {
146146 if ( ! state . result ) {
147147 return ;
148148 }
@@ -151,7 +151,6 @@ const slice = createSlice({
151151 state . result . data = prepareQueryData ( null ) ;
152152 }
153153
154- // Initialize speed metrics if not present
155154 if ( ! state . result . speedMetrics ) {
156155 state . result . speedMetrics = {
157156 rowsPerSecond : 0 ,
@@ -161,45 +160,27 @@ const slice = createSlice({
161160 }
162161
163162 const currentTime = Date . now ( ) ;
164- let totalNewRows = 0 ;
165-
166- const mergedStreamDataChunks = new Map < number , StreamDataChunk > ( ) ;
167- for ( const chunk of action . payload ) {
168- const currentMergedChunk = mergedStreamDataChunks . get ( chunk . meta . result_index ) ;
169- const chunkRowCount = ( chunk . result . rows || [ ] ) . length ;
170- totalNewRows += chunkRowCount ;
171-
172- if ( currentMergedChunk ) {
173- if ( ! currentMergedChunk . result . rows ) {
174- currentMergedChunk . result . rows = [ ] ;
175- }
176- for ( const row of chunk . result . rows || [ ] ) {
177- currentMergedChunk . result . rows . push ( row ) ;
178- }
179- } else {
180- mergedStreamDataChunks . set ( chunk . meta . result_index , chunk ) ;
181- }
182- }
163+ const chunk = action . payload ;
183164
184165 // Update speed metrics
185166 const metrics = state . result . speedMetrics ;
186167 metrics . recentChunks . push ( {
187168 timestamp : currentTime ,
188- rowCount : totalNewRows ,
169+ rowCount : chunk . result . rows ?. length || 0 ,
189170 } ) ;
190171
191172 // Keep only chunks from the last 5 seconds
192173 const WINDOW_SIZE = 5000 ; // 5 seconds in milliseconds
193174 metrics . recentChunks = metrics . recentChunks . filter (
194- ( chunk ) => currentTime - chunk . timestamp <= WINDOW_SIZE ,
175+ ( _chunk ) => currentTime - _chunk . timestamp <= WINDOW_SIZE ,
195176 ) ;
196177
197178 // Calculate moving average
198179 if ( metrics . recentChunks . length > 0 ) {
199180 const oldestChunkTime = metrics . recentChunks [ 0 ] . timestamp ;
200181 const timeWindow = ( currentTime - oldestChunkTime ) / 1000 ; // Convert to seconds
201182 const totalRows = metrics . recentChunks . reduce (
202- ( sum , chunk ) => sum + chunk . rowCount ,
183+ ( sum , _chunk ) => sum + _chunk . rowCount ,
203184 0 ,
204185 ) ;
205186 metrics . rowsPerSecond = timeWindow > 0 ? totalRows / timeWindow : 0 ;
@@ -210,40 +191,38 @@ const slice = createSlice({
210191 if ( ! state . result . data . resultSets ) {
211192 state . result . data . resultSets = [ ] ;
212193 }
194+ const resultIndex = chunk . meta . result_index ;
213195
214- for ( const [ resultIndex , chunk ] of mergedStreamDataChunks . entries ( ) ) {
215- const { columns, rows} = chunk . result ;
196+ const { columns, rows} = chunk . result ;
216197
217- if ( ! state . result . data . resultSets [ resultIndex ] ) {
218- state . result . data . resultSets [ resultIndex ] = {
219- columns : [ ] ,
220- result : [ ] ,
221- } ;
222- }
198+ if ( ! state . result . data . resultSets [ resultIndex ] ) {
199+ state . result . data . resultSets [ resultIndex ] = {
200+ columns : [ ] ,
201+ result : [ ] ,
202+ } ;
203+ }
223204
224- if ( columns && ! state . result . data . resultSets [ resultIndex ] . columns ?. length ) {
225- state . result . data . resultSets [ resultIndex ] . columns ?. push ( INDEX_COLUMN ) ;
226- for ( const column of columns ) {
227- state . result . data . resultSets [ resultIndex ] . columns ?. push ( column ) ;
228- }
205+ if ( columns && ! state . result . data . resultSets [ resultIndex ] . columns ?. length ) {
206+ state . result . data . resultSets [ resultIndex ] . columns ?. push ( INDEX_COLUMN ) ;
207+ for ( const column of columns ) {
208+ state . result . data . resultSets [ resultIndex ] . columns ?. push ( column ) ;
229209 }
210+ }
230211
231- const indexedRows = rows || [ ] ;
232- const startIndex =
233- state . result ?. data ?. resultSets ?. [ resultIndex ] . result ?. length || 1 ;
212+ const indexedRows = rows || [ ] ;
213+ const startIndex = state . result ?. data ?. resultSets ?. [ resultIndex ] . result ?. length || 1 ;
234214
235- indexedRows . forEach ( ( row , index ) => {
236- row . unshift ( startIndex + index ) ;
237- } ) ;
215+ indexedRows . forEach ( ( row , index ) => {
216+ row . unshift ( startIndex + index ) ;
217+ } ) ;
238218
239- const formattedRows = parseResult (
240- indexedRows ,
241- state . result . data . resultSets [ resultIndex ] . columns || [ ] ,
242- ) ;
219+ const formattedRows = parseResult (
220+ indexedRows ,
221+ state . result . data . resultSets [ resultIndex ] . columns || [ ] ,
222+ ) ;
243223
244- for ( const row of formattedRows ) {
245- state . result . data . resultSets [ resultIndex ] . result ?. push ( row ) ;
246- }
224+ for ( const row of formattedRows ) {
225+ state . result . data . resultSets [ resultIndex ] . result ?. push ( row ) ;
247226 }
248227 } ,
249228 setStreamQueryResponse : ( state , action : PayloadAction < QueryResponseChunk > ) => {
@@ -302,7 +281,7 @@ export const {
302281 goToNextQuery,
303282 setTenantPath,
304283 setQueryHistoryFilter,
305- addStreamingChunks ,
284+ addStreamingChunk ,
306285 setStreamQueryResponse,
307286 setStreamSession,
308287} = slice . actions ;
@@ -348,17 +327,6 @@ export const queryApi = api.injectEndpoints({
348327 ) ;
349328
350329 try {
351- let streamDataChunkBatch : StreamDataChunk [ ] = [ ] ;
352- let batchTimeout : number | null = null ;
353-
354- const flushBatch = ( ) => {
355- if ( streamDataChunkBatch . length > 0 ) {
356- dispatch ( addStreamingChunks ( streamDataChunkBatch ) ) ;
357- streamDataChunkBatch = [ ] ;
358- }
359- batchTimeout = null ;
360- } ;
361-
362330 await window . api . streaming . streamQuery (
363331 {
364332 query,
@@ -390,20 +358,11 @@ export const queryApi = api.injectEndpoints({
390358 dispatch ( setStreamSession ( chunk ) ) ;
391359 } ,
392360 onStreamDataChunk : ( chunk ) => {
393- streamDataChunkBatch . push ( chunk ) ;
394- if ( ! batchTimeout ) {
395- batchTimeout = window . requestAnimationFrame ( flushBatch ) ;
396- }
361+ dispatch ( addStreamingChunk ( chunk ) ) ;
397362 } ,
398363 } ,
399364 ) ;
400365
401- // Flush any remaining chunks
402- if ( batchTimeout ) {
403- window . cancelAnimationFrame ( batchTimeout ) ;
404- flushBatch ( ) ;
405- }
406-
407366 return { data : null } ;
408367 } catch ( error ) {
409368 const state = getState ( ) as RootState ;
0 commit comments