@@ -142,7 +142,7 @@ const slice = createSlice({
142142 state . result . queryId = chunk . meta . query_id ;
143143 state . result . data . traceId = chunk . meta . trace_id ;
144144 } ,
145- addStreamingChunk : ( state , action : PayloadAction < StreamDataChunk > ) => {
145+ addStreamingChunks : ( state , action : PayloadAction < StreamDataChunk [ ] > ) => {
146146 if ( ! state . result ) {
147147 return ;
148148 }
@@ -151,6 +151,7 @@ const slice = createSlice({
151151 state . result . data = prepareQueryData ( null ) ;
152152 }
153153
154+ // Initialize speed metrics if not present
154155 if ( ! state . result . speedMetrics ) {
155156 state . result . speedMetrics = {
156157 rowsPerSecond : 0 ,
@@ -160,27 +161,45 @@ const slice = createSlice({
160161 }
161162
162163 const currentTime = Date . now ( ) ;
163- const chunk = action . payload ;
164+ let totalNewRows = 0 ;
165+
166+ const mergedStreamDataChunks = new Map < number , StreamDataChunk > ( ) ;
167+ for ( const chunk of action . payload ) {
168+ const currentMergedChunk = mergedStreamDataChunks . get ( chunk . meta . result_index ) ;
169+ const chunkRowCount = ( chunk . result . rows || [ ] ) . length ;
170+ totalNewRows += chunkRowCount ;
171+
172+ if ( currentMergedChunk ) {
173+ if ( ! currentMergedChunk . result . rows ) {
174+ currentMergedChunk . result . rows = [ ] ;
175+ }
176+ for ( const row of chunk . result . rows || [ ] ) {
177+ currentMergedChunk . result . rows . push ( row ) ;
178+ }
179+ } else {
180+ mergedStreamDataChunks . set ( chunk . meta . result_index , chunk ) ;
181+ }
182+ }
164183
165184 // Update speed metrics
166185 const metrics = state . result . speedMetrics ;
167186 metrics . recentChunks . push ( {
168187 timestamp : currentTime ,
169- rowCount : chunk . result . rows ?. length || 0 ,
188+ rowCount : totalNewRows ,
170189 } ) ;
171190
172191 // Keep only chunks from the last 5 seconds
173192 const WINDOW_SIZE = 5000 ; // 5 seconds in milliseconds
174193 metrics . recentChunks = metrics . recentChunks . filter (
175- ( _chunk ) => currentTime - _chunk . timestamp <= WINDOW_SIZE ,
194+ ( chunk ) => currentTime - chunk . timestamp <= WINDOW_SIZE ,
176195 ) ;
177196
178197 // Calculate moving average
179198 if ( metrics . recentChunks . length > 0 ) {
180199 const oldestChunkTime = metrics . recentChunks [ 0 ] . timestamp ;
181200 const timeWindow = ( currentTime - oldestChunkTime ) / 1000 ; // Convert to seconds
182201 const totalRows = metrics . recentChunks . reduce (
183- ( sum , _chunk ) => sum + _chunk . rowCount ,
202+ ( sum , chunk ) => sum + chunk . rowCount ,
184203 0 ,
185204 ) ;
186205 metrics . rowsPerSecond = timeWindow > 0 ? totalRows / timeWindow : 0 ;
@@ -191,38 +210,40 @@ const slice = createSlice({
191210 if ( ! state . result . data . resultSets ) {
192211 state . result . data . resultSets = [ ] ;
193212 }
194- const resultIndex = chunk . meta . result_index ;
195213
196- const { columns, rows} = chunk . result ;
214+ for ( const [ resultIndex , chunk ] of mergedStreamDataChunks . entries ( ) ) {
215+ const { columns, rows} = chunk . result ;
197216
198- if ( ! state . result . data . resultSets [ resultIndex ] ) {
199- state . result . data . resultSets [ resultIndex ] = {
200- columns : [ ] ,
201- result : [ ] ,
202- } ;
203- }
217+ if ( ! state . result . data . resultSets [ resultIndex ] ) {
218+ state . result . data . resultSets [ resultIndex ] = {
219+ columns : [ ] ,
220+ result : [ ] ,
221+ } ;
222+ }
204223
205- if ( columns && ! state . result . data . resultSets [ resultIndex ] . columns ?. length ) {
206- state . result . data . resultSets [ resultIndex ] . columns ?. push ( INDEX_COLUMN ) ;
207- for ( const column of columns ) {
208- state . result . data . resultSets [ resultIndex ] . columns ?. push ( column ) ;
224+ if ( columns && ! state . result . data . resultSets [ resultIndex ] . columns ?. length ) {
225+ state . result . data . resultSets [ resultIndex ] . columns ?. push ( INDEX_COLUMN ) ;
226+ for ( const column of columns ) {
227+ state . result . data . resultSets [ resultIndex ] . columns ?. push ( column ) ;
228+ }
209229 }
210- }
211230
212- const indexedRows = rows || [ ] ;
213- const startIndex = state . result ?. data ?. resultSets ?. [ resultIndex ] . result ?. length || 1 ;
231+ const indexedRows = rows || [ ] ;
232+ const startIndex =
233+ state . result ?. data ?. resultSets ?. [ resultIndex ] . result ?. length || 1 ;
214234
215- indexedRows . forEach ( ( row , index ) => {
216- row . unshift ( startIndex + index ) ;
217- } ) ;
235+ indexedRows . forEach ( ( row , index ) => {
236+ row . unshift ( startIndex + index ) ;
237+ } ) ;
218238
219- const formattedRows = parseResult (
220- indexedRows ,
221- state . result . data . resultSets [ resultIndex ] . columns || [ ] ,
222- ) ;
239+ const formattedRows = parseResult (
240+ indexedRows ,
241+ state . result . data . resultSets [ resultIndex ] . columns || [ ] ,
242+ ) ;
223243
224- for ( const row of formattedRows ) {
225- state . result . data . resultSets [ resultIndex ] . result ?. push ( row ) ;
244+ for ( const row of formattedRows ) {
245+ state . result . data . resultSets [ resultIndex ] . result ?. push ( row ) ;
246+ }
226247 }
227248 } ,
228249 setStreamQueryResponse : ( state , action : PayloadAction < QueryResponseChunk > ) => {
@@ -281,7 +302,7 @@ export const {
281302 goToNextQuery,
282303 setTenantPath,
283304 setQueryHistoryFilter,
284- addStreamingChunk ,
305+ addStreamingChunks ,
285306 setStreamQueryResponse,
286307 setStreamSession,
287308} = slice . actions ;
@@ -327,6 +348,17 @@ export const queryApi = api.injectEndpoints({
327348 ) ;
328349
329350 try {
351+ let streamDataChunkBatch : StreamDataChunk [ ] = [ ] ;
352+ let batchTimeout : number | null = null ;
353+
354+ const flushBatch = ( ) => {
355+ if ( streamDataChunkBatch . length > 0 ) {
356+ dispatch ( addStreamingChunks ( streamDataChunkBatch ) ) ;
357+ streamDataChunkBatch = [ ] ;
358+ }
359+ batchTimeout = null ;
360+ } ;
361+
330362 await window . api . streaming . streamQuery (
331363 {
332364 query,
@@ -358,11 +390,20 @@ export const queryApi = api.injectEndpoints({
358390 dispatch ( setStreamSession ( chunk ) ) ;
359391 } ,
360392 onStreamDataChunk : ( chunk ) => {
361- dispatch ( addStreamingChunk ( chunk ) ) ;
393+ streamDataChunkBatch . push ( chunk ) ;
394+ if ( ! batchTimeout ) {
395+ batchTimeout = window . requestAnimationFrame ( flushBatch ) ;
396+ }
362397 } ,
363398 } ,
364399 ) ;
365400
401+ // Flush any remaining chunks
402+ if ( batchTimeout ) {
403+ window . cancelAnimationFrame ( batchTimeout ) ;
404+ flushBatch ( ) ;
405+ }
406+
366407 return { data : null } ;
367408 } catch ( error ) {
368409 const state = getState ( ) as RootState ;
0 commit comments