Skip to content

Commit 7ea5974

Browse files
committed
feat: multiple responses
1 parent 85d95c0 commit 7ea5974

File tree

2 files changed

+35
-20
lines changed

2 files changed

+35
-20
lines changed

src/services/api/viewer.ts

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import type {DescribeTopicResult} from '../../types/api/topic';
3838
import type {TEvVDiskStateResponse} from '../../types/api/vdisk';
3939
import type {TUserToken} from '../../types/api/whoami';
4040
import type {QuerySyntax, TransactionMode} from '../../types/store/query';
41-
import type {StreamingChunk} from '../../types/store/streaming';
41+
import type {StreamDataChunk, StreamingChunk} from '../../types/store/streaming';
4242
import {BINARY_DATA_IN_PLAIN_TEXT_DISPLAY} from '../../utils/constants';
4343
import type {Nullable} from '../../utils/typecheckers';
4444
import {parseMultipart} from '../parsers/parseMultipart';
@@ -386,7 +386,8 @@ export class ViewerAPI extends BaseYdbAPI {
386386
transaction_mode?: TransactionMode;
387387
timeout?: Timeout;
388388
limit_rows?: number;
389-
output_chunk_max_size?: number;
389+
output_chunk_max_size: number;
390+
concurrent_results: number;
390391
},
391392
{
392393
concurrentId,
@@ -409,7 +410,6 @@ export class ViewerAPI extends BaseYdbAPI {
409410
...params,
410411
base64,
411412
schema: 'multipart',
412-
output_chunk_max_size: params.output_chunk_max_size || 1000,
413413
},
414414
{
415415
concurrentId,
@@ -430,32 +430,38 @@ export class ViewerAPI extends BaseYdbAPI {
430430
lastProcessedLength,
431431
});
432432
lastProcessedLength = currentLength;
433-
let mergedChunk = null;
433+
const mergedChunks = new Map<number, StreamDataChunk>();
434434

435435
for (const chunk of chunks) {
436436
if (isSessionChunk(chunk)) {
437437
const traceId = response
438438
.getResponseHeader('traceresponse')
439439
?.split('-')[1];
440+
440441
chunk.meta.trace_id = traceId;
441442
onChunk(chunk);
442443
} else if (isQueryResponseChunk(chunk)) {
443-
if (mergedChunk) {
444-
onChunk?.(mergedChunk);
445-
mergedChunk = null;
444+
for (const [index, mergedChunk] of mergedChunks) {
445+
onChunk(mergedChunk);
446+
mergedChunks.delete(index);
446447
}
448+
447449
onChunk(chunk);
448450
} else if (isStreamDataChunk(chunk)) {
451+
const resultIndex = chunk.meta.result_index;
452+
const mergedChunk = mergedChunks.get(resultIndex);
453+
449454
if (mergedChunk) {
450455
mergedChunk.result.rows.push(...chunk.result.rows);
451456
} else {
452-
mergedChunk = chunk;
457+
mergedChunks.set(resultIndex, chunk);
453458
}
454459
}
455460
}
456461

457-
if (mergedChunk) {
458-
onChunk?.(mergedChunk);
462+
for (const [index, mergedChunk] of mergedChunks) {
463+
onChunk(mergedChunk);
464+
mergedChunks.delete(index);
459465
}
460466
},
461467
},

src/store/reducers/query/query.ts

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -148,26 +148,31 @@ const slice = createSlice({
148148
} else if (isStreamDataChunk(chunk)) {
149149
const {
150150
result: {columns, rows},
151+
meta: {result_index: resultIndex},
151152
} = chunk;
152153

153-
if (columns && !state.result.data.resultSets?.[0]?.columns?.length) {
154-
if (state.result.data.resultSets) {
155-
state.result.data.resultSets[0].columns = columns;
154+
if (columns && !state.result.data.resultSets?.[resultIndex]?.columns?.length) {
155+
if (!state.result.data.resultSets) {
156+
state.result.data.resultSets = [];
157+
}
158+
159+
if (state.result.data.resultSets[resultIndex]) {
160+
state.result.data.resultSets[resultIndex].columns = columns;
156161
} else {
157-
state.result.data.resultSets = [{columns, result: []}];
162+
state.result.data.resultSets[resultIndex] = {columns, result: []};
158163
}
159164
}
160165
// Convert and append new rows
161-
if (rows.length > 0 && state.result.data.resultSets?.[0]?.columns) {
166+
if (rows.length > 0 && state.result.data.resultSets?.[resultIndex]?.columns) {
162167
const formattedRows = parseResult(
163168
rows,
164-
state.result.data.resultSets[0].columns,
169+
state.result.data.resultSets[resultIndex].columns,
165170
);
166171

167-
if (state.result.data.resultSets[0].result) {
168-
state.result.data.resultSets[0].result.push(...formattedRows);
172+
if (state.result.data.resultSets[resultIndex].result) {
173+
state.result.data.resultSets[resultIndex].result.push(...formattedRows);
169174
} else {
170-
state.result.data.resultSets[0].result = formattedRows;
175+
state.result.data.resultSets[resultIndex].result = formattedRows;
171176
}
172177
}
173178
} else if (isQueryResponseChunk(chunk)) {
@@ -239,6 +244,9 @@ interface QueryStats {
239244
endTime?: string | number;
240245
}
241246

247+
const DEFAULT_STREAM_CHUNK_SIZE = 2;
248+
const DEFAULT_CONCURRENT_RESULTS = 1;
249+
242250
export const queryApi = api.injectEndpoints({
243251
endpoints: (build) => ({
244252
useStreamQuery: build.mutation<null, SendQueryParams>({
@@ -273,7 +281,8 @@ export const queryApi = api.injectEndpoints({
273281
timeout: isNumeric(querySettings.timeout)
274282
? Number(querySettings.timeout) * 1000
275283
: undefined,
276-
output_chunk_max_size: 1000,
284+
output_chunk_max_size: DEFAULT_STREAM_CHUNK_SIZE,
285+
concurrent_results: DEFAULT_CONCURRENT_RESULTS,
277286
},
278287
{
279288
signal,

0 commit comments

Comments
 (0)