@@ -2,13 +2,19 @@ import {parseMultipart} from '@mjackson/multipart-parser';
22import qs from 'qs' ;
33
44import {
5+ isErrorChunk ,
56 isKeepAliveChunk ,
67 isQueryResponseChunk ,
78 isSessionChunk ,
89 isStreamDataChunk ,
910} from '../../store/reducers/query/utils' ;
1011import type { Actions , StreamQueryParams } from '../../types/api/query' ;
11- import type { QueryResponseChunk , SessionChunk , StreamDataChunk } from '../../types/store/streaming' ;
12+ import type {
13+ QueryResponseChunk ,
14+ SessionChunk ,
15+ StreamDataChunk ,
16+ StreamingChunk ,
17+ } from '../../types/store/streaming' ;
1218import {
1319 BINARY_DATA_IN_PLAIN_TEXT_DISPLAY ,
1420 DEV_ENABLE_TRACING_FOR_ALL_REQUESTS ,
@@ -94,24 +100,33 @@ export class StreamingAPI extends BaseYdbAPI {
94100 const traceId = response . headers . get ( 'traceresponse' ) ?. split ( '-' ) [ 1 ] ;
95101
96102 await parseMultipart ( response . body , { boundary : BOUNDARY } , async ( part ) => {
103+ const text = await part . text ( ) ;
104+
105+ let chunk : unknown ;
97106 try {
98- const chunk = JSON . parse ( await part . text ( ) ) ;
99-
100- if ( isSessionChunk ( chunk ) ) {
101- const sessionChunk = chunk ;
102- sessionChunk . meta . trace_id = traceId ;
103- options . onSessionChunk ( chunk ) ;
104- } else if ( isStreamDataChunk ( chunk ) ) {
105- options . onStreamDataChunk ( chunk ) ;
106- } else if ( isQueryResponseChunk ( chunk ) ) {
107- options . onQueryResponseChunk ( chunk ) ;
108- } else if ( isKeepAliveChunk ( chunk ) ) {
109- // Logging for debug purposes
110- console . info ( 'Received keep alive chunk' ) ;
111- }
107+ chunk = JSON . parse ( text ) ;
112108 } catch ( e ) {
113109 throw new Error ( `Error parsing chunk: ${ e } ` ) ;
114110 }
111+
112+ if ( isErrorChunk ( chunk ) ) {
113+ throw chunk ;
114+ }
115+
116+ const streamingChunk = chunk as unknown as StreamingChunk ;
117+
118+ if ( isSessionChunk ( streamingChunk ) ) {
119+ const sessionChunk = streamingChunk ;
120+ sessionChunk . meta . trace_id = traceId ;
121+ options . onSessionChunk ( streamingChunk ) ;
122+ } else if ( isStreamDataChunk ( streamingChunk ) ) {
123+ options . onStreamDataChunk ( streamingChunk ) ;
124+ } else if ( isQueryResponseChunk ( streamingChunk ) ) {
125+ options . onQueryResponseChunk ( streamingChunk ) ;
126+ } else if ( isKeepAliveChunk ( streamingChunk ) ) {
127+ // Logging for debug purposes
128+ console . info ( 'Received keep alive chunk' ) ;
129+ }
115130 } ) ;
116131 }
117132}
0 commit comments