1+ // eslint-disable-next-line import/no-extraneous-dependencies
2+ import { parseMultipart } from '@mjackson/multipart-parser' ;
3+
14import {
25 isQueryResponseChunk ,
36 isSessionChunk ,
@@ -12,7 +15,6 @@ import type {
1215 StreamingChunk ,
1316} from '../../types/store/streaming' ;
1417import { BINARY_DATA_IN_PLAIN_TEXT_DISPLAY } from '../../utils/constants' ;
15- import { MultipartStreamParser } from '../parsers/parseMultipartStream' ;
1618import { settingsManager } from '../settings' ;
1719
1820import { BaseYdbAPI } from './base' ;
@@ -47,7 +49,7 @@ export class StreamingAPI extends BaseYdbAPI {
4749
4850 let traceId : string | undefined = '' ;
4951
50- const streamParser = new MultipartStreamParser ( ( chunk : StreamingChunk ) => {
52+ const handleChunk = ( chunk : StreamingChunk ) => {
5153 if ( isSessionChunk ( chunk ) ) {
5254 const sessionChunk = chunk ;
5355 sessionChunk . meta . trace_id = traceId ;
@@ -57,7 +59,7 @@ export class StreamingAPI extends BaseYdbAPI {
5759 } else if ( isQueryResponseChunk ( chunk ) ) {
5860 options . onQueryResponseChunk ( chunk ) ;
5961 }
60- } ) ;
62+ } ;
6163
6264 const queryParams = new URLSearchParams ( ) ;
6365 // Add only string/number params
@@ -91,19 +93,15 @@ export class StreamingAPI extends BaseYdbAPI {
9193 throw new Error ( 'Empty response body' ) ;
9294 }
9395
94- const reader = response . body . getReader ( ) ;
9596 traceId = response . headers . get ( 'traceresponse' ) ?. split ( '-' ) [ 1 ] ;
9697
97- try {
98- for ( ; ; ) {
99- const { done, value} = await reader . read ( ) ;
100- if ( done ) {
101- break ;
102- }
103- streamParser . processBuffer ( value ) ;
98+ await parseMultipart ( response . body , { boundary : 'boundary' } , async ( part ) => {
99+ try {
100+ const chunk = JSON . parse ( await part . text ( ) ) ;
101+ handleChunk ( chunk ) ;
102+ } catch ( e ) {
103+ throw new Error ( `Error parsing chunk: ${ e } ` ) ;
104104 }
105- } finally {
106- reader . releaseLock ( ) ;
107- }
105+ } ) ;
108106 }
109107}
0 commit comments