File tree Expand file tree Collapse file tree 1 file changed +29
-1
lines changed Expand file tree Collapse file tree 1 file changed +29
-1
lines changed Original file line number Diff line number Diff line change 11import type { PlanToSvgQueryParams } from '../../store/reducers/planToSvg' ;
2+ import {
3+ isQueryResponseChunk ,
4+ isSessionChunk ,
5+ isStreamDataChunk ,
6+ } from '../../store/reducers/query/utils' ;
27import type { TMetaInfo } from '../../types/api/acl' ;
38import type { TQueryAutocomplete } from '../../types/api/autocomplete' ;
49import type { CapabilitiesResponse } from '../../types/api/capabilities' ;
@@ -422,8 +427,31 @@ export class ViewerAPI extends BaseYdbAPI {
422427 state : parserState ,
423428 } ) ;
424429 parserState = state ;
430+ let mergedChunk = null ;
431+
425432 for ( const chunk of chunks ) {
426- onChunk ?.( chunk ) ;
433+ if ( isSessionChunk ( chunk ) || isQueryResponseChunk ( chunk ) ) {
434+ // First dispatch any accumulated data chunk
435+ if ( mergedChunk ) {
436+ onChunk ?.( mergedChunk ) ;
437+ mergedChunk = null ;
438+ }
439+ // Then dispatch control chunk
440+ onChunk ?.( chunk ) ;
441+ } else if ( isStreamDataChunk ( chunk ) ) {
442+ if ( mergedChunk ) {
443+ // Merge rows from subsequent chunks
444+ mergedChunk . result . rows . push ( ...chunk . result . rows ) ;
445+ } else {
446+ // First data chunk - use as base with columns
447+ mergedChunk = chunk ;
448+ }
449+ }
450+ }
451+
452+ // Dispatch any remaining merged chunk
453+ if ( mergedChunk ) {
454+ onChunk ?.( mergedChunk ) ;
427455 }
428456 } ,
429457 } ,
You can’t perform that action at this time.
0 commit comments