Skip to content

Commit f2d6c1c

Browse files
committed
fix: streaming
1 parent bd7430b commit f2d6c1c

File tree

4 files changed

+95
-18
lines changed

4 files changed

+95
-18
lines changed

src/services/api/streaming.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
import {isSessionChunk} from '../../store/reducers/query/utils';
1+
import {isQueryResponseChunk, isSessionChunk} from '../../store/reducers/query/utils';
22
import type {Actions, TracingLevel} from '../../types/api/query';
33
import type {QuerySyntax, StatisticsMode} from '../../types/store/query';
44
import type {StreamingChunk} from '../../types/store/streaming';
5+
import {StreamDataBatcher} from '../../utils/StreamDataBatcher';
56
import {BINARY_DATA_IN_PLAIN_TEXT_DISPLAY} from '../../utils/constants';
67
import {MultipartStreamParser} from '../parsers/parseMultipartStream';
78
import {settingsManager} from '../settings';
@@ -35,13 +36,25 @@ export class StreamingAPI extends BaseYdbAPI {
3536
);
3637

3738
let traceId: string | undefined = '';
39+
const batcher = new StreamDataBatcher();
3840

3941
const streamParser = new MultipartStreamParser((chunk: StreamingChunk) => {
4042
if (isSessionChunk(chunk)) {
4143
const sessionChunk = chunk;
4244
sessionChunk.meta.trace_id = traceId;
45+
options.onChunk(chunk);
46+
} else if (isQueryResponseChunk(chunk)) {
47+
// Flush any batched chunks before sending response
48+
const batchedChunks = batcher.flush();
49+
batchedChunks.forEach((batchedChunk) => options.onChunk(batchedChunk));
50+
options.onChunk(chunk);
51+
} else {
52+
// For stream data chunks, try to batch them
53+
const batchedChunk = batcher.addChunk(chunk);
54+
if (batchedChunk) {
55+
options.onChunk(batchedChunk);
56+
}
4357
}
44-
options.onChunk(chunk);
4558
});
4659

4760
const queryParams = new URLSearchParams();
@@ -88,6 +101,9 @@ export class StreamingAPI extends BaseYdbAPI {
88101
streamParser.processBuffer(value);
89102
}
90103
} finally {
104+
// Flush any remaining batched chunks
105+
const remainingChunks = batcher.flush();
106+
remainingChunks.forEach((chunk) => options.onChunk(chunk));
91107
reader.releaseLock();
92108
}
93109
}

src/services/parsers/parseMultipartStream.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,11 @@ export class MultipartStreamParser {
9696
if (boundaryPos === -1) {
9797
// Keep last boundary length bytes in case boundary is split between chunks
9898
if (this.buffer.length > this.boundaryBytes.length) {
99-
const newBuffer = new Uint8Array(this.boundaryBytes.length);
100-
newBuffer.set(
99+
const partialBoundaryBuffer = new Uint8Array(this.boundaryBytes.length);
100+
partialBoundaryBuffer.set(
101101
this.buffer.slice(this.buffer.length - this.boundaryBytes.length),
102102
);
103-
this.buffer = newBuffer;
103+
this.buffer = partialBoundaryBuffer;
104104
}
105105
this.bufferPos = 0;
106106
break;
@@ -134,9 +134,9 @@ export class MultipartStreamParser {
134134

135135
// Compact buffer if we've processed some data
136136
if (this.bufferPos > 0) {
137-
const newBuffer = new Uint8Array(this.buffer.length - this.bufferPos);
138-
newBuffer.set(this.buffer.slice(this.bufferPos));
139-
this.buffer = newBuffer;
137+
const compactedBuffer = new Uint8Array(this.buffer.length - this.bufferPos);
138+
compactedBuffer.set(this.buffer.slice(this.bufferPos));
139+
this.buffer = compactedBuffer;
140140
this.bufferPos = 0;
141141
}
142142
}

src/store/reducers/query/query.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -168,27 +168,27 @@ const slice = createSlice({
168168
};
169169
}
170170
}
171-
// Convert and append new rows
171+
// Process rows in batches
172172
if (
173173
rows &&
174174
rows.length > 0 &&
175175
state.result.data.resultSets?.[resultIndex]?.columns
176176
) {
177-
const currentRows = state.result.data.resultSets[resultIndex].result;
178-
let currentIndex = currentRows?.length || 1;
177+
const currentRows = state.result.data.resultSets[resultIndex].result || [];
178+
let currentIndex = currentRows.length || 1;
179179

180+
// Process all rows at once instead of one by one
180181
const indexedRows = rows.map((row) => [currentIndex++, ...row]);
181-
182182
const formattedRows = parseResult(
183183
indexedRows,
184184
state.result.data.resultSets[resultIndex].columns,
185185
);
186186

187-
if (state.result.data.resultSets[resultIndex].result) {
188-
state.result.data.resultSets[resultIndex].result.push(...formattedRows);
189-
} else {
190-
state.result.data.resultSets[resultIndex].result = formattedRows;
191-
}
187+
// Update result set with all formatted rows at once
188+
state.result.data.resultSets[resultIndex].result = [
189+
...(state.result.data.resultSets[resultIndex].result || []),
190+
...formattedRows,
191+
];
192192
}
193193
} else if (isQueryResponseChunk(chunk)) {
194194
state.result.isLoading = false;
@@ -259,7 +259,7 @@ interface QueryStats {
259259
endTime?: string | number;
260260
}
261261

262-
const DEFAULT_STREAM_CHUNK_SIZE = 2;
262+
const DEFAULT_STREAM_CHUNK_SIZE = 10000;
263263
const DEFAULT_CONCURRENT_RESULTS = false;
264264

265265
export const queryApi = api.injectEndpoints({

src/utils/StreamDataBatcher.ts

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import {isStreamDataChunk} from '../store/reducers/query/utils';
2+
import type {StreamDataChunk, StreamingChunk} from '../types/store/streaming';
3+
4+
const DEFAULT_BATCH_SIZE = 1000;
5+
6+
export class StreamDataBatcher {
7+
private mergedChunks: Record<number, StreamDataChunk> = {}; // resultIndex -> mergedChunk
8+
private readonly batchSize: number;
9+
10+
constructor(options = {batchSize: DEFAULT_BATCH_SIZE}) {
11+
this.batchSize = options.batchSize;
12+
}
13+
14+
addChunk(chunk: StreamingChunk): StreamingChunk | null {
15+
if (!isStreamDataChunk(chunk)) {
16+
return chunk;
17+
}
18+
19+
const {
20+
meta: {result_index: resultIndex},
21+
} = chunk;
22+
23+
if (!this.mergedChunks[resultIndex]) {
24+
this.mergedChunks[resultIndex] = {
25+
...chunk,
26+
result: {
27+
rows: [],
28+
columns: chunk.result.columns,
29+
},
30+
};
31+
}
32+
33+
const mergedChunk = this.mergedChunks[resultIndex];
34+
35+
const rows = mergedChunk.result.rows || [];
36+
if (chunk.result.rows) {
37+
rows.push(...chunk.result.rows);
38+
}
39+
40+
// If we have enough rows, return the merged chunk and reset
41+
if (rows.length >= this.batchSize) {
42+
const result = {
43+
...mergedChunk,
44+
result: {
45+
rows: mergedChunk.result.rows?.slice() || [],
46+
columns: mergedChunk.result.columns,
47+
},
48+
};
49+
mergedChunk.result.rows = [];
50+
return result;
51+
}
52+
53+
return null; // Not enough rows yet
54+
}
55+
56+
flush(): StreamingChunk[] {
57+
const results = Object.values(this.mergedChunks);
58+
this.mergedChunks = {};
59+
return results;
60+
}
61+
}

0 commit comments

Comments
 (0)