Skip to content

Commit 5ed8479

Browse files
committed
feat: streaming api
1 parent 0c21217 commit 5ed8479

File tree

7 files changed

+252
-442
lines changed

7 files changed

+252
-442
lines changed

src/services/api/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {OperationAPI} from './operation';
66
import {PDiskAPI} from './pdisk';
77
import {SchemeAPI} from './scheme';
88
import {StorageAPI} from './storage';
9+
import {StreamingAPI} from './streaming';
910
import {TabletsAPI} from './tablets';
1011
import {TraceAPI} from './trace';
1112
import {VDiskAPI} from './vdisk';
@@ -17,6 +18,7 @@ export class YdbEmbeddedAPI {
1718
pdisk: PDiskAPI;
1819
scheme: SchemeAPI;
1920
storage: StorageAPI;
21+
streaming: StreamingAPI;
2022
tablets: TabletsAPI;
2123
trace: TraceAPI;
2224
vdisk: VDiskAPI;
@@ -32,6 +34,7 @@ export class YdbEmbeddedAPI {
3234
this.pdisk = new PDiskAPI({config});
3335
this.scheme = new SchemeAPI({config});
3436
this.storage = new StorageAPI({config});
37+
this.streaming = new StreamingAPI({config});
3538
this.tablets = new TabletsAPI({config});
3639
this.trace = new TraceAPI({config});
3740
this.vdisk = new VDiskAPI({config});

src/services/api/streaming.ts

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import {isSessionChunk} from '../../store/reducers/query/utils';
2+
import type {Actions, TracingLevel} from '../../types/api/query';
3+
import type {QuerySyntax, StatisticsMode} from '../../types/store/query';
4+
import type {SessionChunk, StreamingChunk} from '../../types/store/streaming';
5+
import {BINARY_DATA_IN_PLAIN_TEXT_DISPLAY} from '../../utils/constants';
6+
import {MultipartStreamParser} from '../parsers/parseMultipartStream';
7+
import {settingsManager} from '../settings';
8+
9+
import {BaseYdbAPI} from './base';
10+
11+
export interface StreamQueryParams {
12+
query?: string;
13+
database?: string;
14+
action?: Actions;
15+
syntax?: QuerySyntax;
16+
stats?: StatisticsMode;
17+
tracingLevel?: TracingLevel;
18+
transaction_mode?: string;
19+
timeout?: number;
20+
limit_rows?: number;
21+
output_chunk_max_size: number;
22+
concurrent_results?: boolean;
23+
}
24+
25+
export interface StreamQueryOptions {
26+
signal?: AbortSignal;
27+
onChunk: (chunk: StreamingChunk) => void;
28+
}
29+
30+
export class StreamingAPI extends BaseYdbAPI {
31+
async streamQuery(params: StreamQueryParams, options: StreamQueryOptions) {
32+
const base64 = !settingsManager.readUserSettingsValue(
33+
BINARY_DATA_IN_PLAIN_TEXT_DISPLAY,
34+
true,
35+
);
36+
37+
let traceId: string | undefined = '';
38+
39+
const streamParser = new MultipartStreamParser((chunk: StreamingChunk) => {
40+
if (isSessionChunk(chunk)) {
41+
const sessionChunk = chunk as SessionChunk;
42+
sessionChunk.meta = {
43+
event: sessionChunk.meta.event,
44+
node_id: sessionChunk.meta.node_id,
45+
query_id: sessionChunk.meta.query_id,
46+
session_id: sessionChunk.meta.session_id,
47+
trace_id: traceId,
48+
};
49+
}
50+
options.onChunk(chunk);
51+
});
52+
53+
const queryParams = new URLSearchParams();
54+
// Add only string/number params
55+
Object.entries(params).forEach(([key, value]) => {
56+
if (value !== undefined) {
57+
queryParams.set(key, String(value));
58+
}
59+
});
60+
queryParams.set('base64', String(base64));
61+
queryParams.set('schema', 'multipart');
62+
63+
const headers = new Headers({
64+
Accept: 'multipart/x-mixed-replace',
65+
});
66+
67+
if (params.tracingLevel) {
68+
headers.set('X-Trace-Verbosity', String(params.tracingLevel));
69+
}
70+
71+
const response = await fetch(`${this.getPath('/viewer/query')}?${queryParams}`, {
72+
method: 'GET',
73+
signal: options.signal,
74+
headers,
75+
});
76+
77+
if (!response.ok) {
78+
throw new Error(`HTTP error! status: ${response.status}`);
79+
}
80+
81+
if (!response.body) {
82+
throw new Error('Response body is null');
83+
}
84+
85+
const reader = response.body.getReader();
86+
traceId = response.headers.get('traceresponse')?.split('-')[1];
87+
88+
try {
89+
for (;;) {
90+
const {done, value} = await reader.read();
91+
if (done) {
92+
break;
93+
}
94+
streamParser.processChunk(value);
95+
}
96+
} finally {
97+
reader.releaseLock();
98+
}
99+
}
100+
}

src/services/api/viewer.ts

Lines changed: 0 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,4 @@
11
import type {PlanToSvgQueryParams} from '../../store/reducers/planToSvg';
2-
import {
3-
isQueryResponseChunk,
4-
isSessionChunk,
5-
isStreamDataChunk,
6-
} from '../../store/reducers/query/utils';
72
import type {TMetaInfo} from '../../types/api/acl';
83
import type {TQueryAutocomplete} from '../../types/api/autocomplete';
94
import type {CapabilitiesResponse} from '../../types/api/capabilities';
@@ -38,10 +33,8 @@ import type {DescribeTopicResult} from '../../types/api/topic';
3833
import type {TEvVDiskStateResponse} from '../../types/api/vdisk';
3934
import type {TUserToken} from '../../types/api/whoami';
4035
import type {QuerySyntax, TransactionMode} from '../../types/store/query';
41-
import type {StreamDataChunk, StreamingChunk} from '../../types/store/streaming';
4236
import {BINARY_DATA_IN_PLAIN_TEXT_DISPLAY} from '../../utils/constants';
4337
import type {Nullable} from '../../utils/typecheckers';
44-
import {parseMultipart} from '../parsers/parseMultipart';
4538
import {settingsManager} from '../settings';
4639

4740
import {BaseYdbAPI} from './base';
@@ -375,97 +368,6 @@ export class ViewerAPI extends BaseYdbAPI {
375368
);
376369
}
377370

378-
streamQuery<Action extends Actions>(
379-
params: {
380-
query?: string;
381-
database?: string;
382-
action?: Action;
383-
syntax?: QuerySyntax;
384-
stats?: Stats;
385-
tracingLevel?: TracingLevel;
386-
transaction_mode?: TransactionMode;
387-
timeout?: Timeout;
388-
limit_rows?: number;
389-
output_chunk_max_size: number;
390-
concurrent_results?: boolean;
391-
},
392-
{
393-
concurrentId,
394-
signal,
395-
onChunk,
396-
}: AxiosOptions & {
397-
onChunk: (chunk: StreamingChunk) => void;
398-
},
399-
) {
400-
const base64 = !settingsManager.readUserSettingsValue(
401-
BINARY_DATA_IN_PLAIN_TEXT_DISPLAY,
402-
true,
403-
);
404-
405-
let lastProcessedLength = 0;
406-
407-
return this.get<string>(
408-
this.getPath('/viewer/query'),
409-
{
410-
...params,
411-
base64,
412-
schema: 'multipart',
413-
},
414-
{
415-
concurrentId,
416-
timeout: params.timeout,
417-
requestConfig: {
418-
signal,
419-
'axios-retry': {retries: 0}, // No retries for streaming
420-
responseType: 'arraybuffer',
421-
},
422-
headers: {
423-
...(params.tracingLevel ? {'X-Trace-Verbosity': params.tracingLevel} : {}),
424-
Accept: 'multipart/x-mixed-replace',
425-
},
426-
onDownloadProgress: (progressEvent) => {
427-
const xhr = progressEvent.event.target as XMLHttpRequest;
428-
const {chunks, lastProcessedLength: currentLength} = parseMultipart({
429-
response: xhr.response,
430-
lastProcessedLength,
431-
});
432-
lastProcessedLength = currentLength;
433-
const mergedChunks = new Map<number, StreamDataChunk>();
434-
435-
for (const chunk of chunks) {
436-
if (isSessionChunk(chunk)) {
437-
const traceId = xhr.getResponseHeader('traceresponse')?.split('-')[1];
438-
439-
chunk.meta.trace_id = traceId;
440-
onChunk(chunk);
441-
} else if (isQueryResponseChunk(chunk)) {
442-
for (const [index, mergedChunk] of mergedChunks) {
443-
onChunk(mergedChunk);
444-
mergedChunks.delete(index);
445-
}
446-
447-
onChunk(chunk);
448-
} else if (isStreamDataChunk(chunk)) {
449-
const resultIndex = chunk.meta.result_index;
450-
const mergedChunk = mergedChunks.get(resultIndex);
451-
452-
if (mergedChunk && mergedChunk.result.rows && chunk.result.rows) {
453-
mergedChunk.result.rows.push(...chunk.result.rows);
454-
} else {
455-
mergedChunks.set(resultIndex, chunk);
456-
}
457-
}
458-
}
459-
460-
for (const [index, mergedChunk] of mergedChunks) {
461-
onChunk(mergedChunk);
462-
mergedChunks.delete(index);
463-
}
464-
},
465-
},
466-
);
467-
}
468-
469371
getHotKeys(
470372
{path, database, enableSampling}: {path: string; database: string; enableSampling: boolean},
471373
{concurrentId, signal}: AxiosOptions = {},

0 commit comments

Comments
 (0)